Kontinuierliche, möglicherweise leere Warteschlange 'abarbeiten'



  • Hallo,

    ich hab ein kleines Diagramm erstellt, das die Situation erklären soll:

    http://img69.imageshack.us/img69/8922/threadqueueproblem.png

    Ich erhalte 3 verschiedene Typen von Daten (Typ A, Typ B, Typ C) (theoretisch unenedlich viele verschiedene Typen, die maximale Anzahl ist bei Programmstart bekannt). Diese Daten erhalte ich kontinuierlich aber nicht konstant, d.h. mal kommt nur Typ A, mal nur B und zeitweise auch gar keine. Diese Daten werden in der Reihenfolge in der sie ankommen in einer ersten Warteschlange gespeichert (Win32 Message Queue) und mehr oder weniger sofort an die, ihnen zugeordneten Typen Queues (Verarbeitungs Thread Queues) weitergeleitet.
    Um die Daten verarbeiten zu können steht für jeden Typ ein eigener Thread zur Verfügung. Es ist wichtig, dass die Daten in der Reihenfolge abgearbeitet werden, in der Sie auch ankommen. Das ist auch der Grund warum ich keinen Thread-Pool verwenden kann.

    Meine eigentliche Frage:

    Was ist die performanteste Methode für die Verarbeitungs-Threads den ihnen zugeordneten Queues ein Element zu entnehmen und dann zu verarbeiten?

    Meine bisherige Implementierung (inspiriert von den Windows Message Queues):

    (Die Thread-Queues basieren auf diesem Beispiel: http://en.wikipedia.org/w/index.php?title=Queue_(abstract_data_type)&oldid=453198824#Example_using_C )

    //Diese Prozedur wird einmalig, bei der Erstellung des Threads aufgerufen.
    INT WorkerThreadProcedure(LPVOID lpParam)
    {
        PDATAQUEUE queue = (PDATAQUEUE)queue; // zeiger auf den thread queue
        DATA data;
    
        while(GetData(&data, queue) > 0)
        {
            Sleep(1); // Problemzone...
    
            if(data.Handled) // auch ein bisschen Problemzone, vielleicht
                HandleEvent(&data);
        }
    
        DestroyQueue(queue);
        return 0;
    }
    
    INT GetData(PDATA data, PDATAQUEUE queue)
    {
        if(!DequeueDataItem(queue, data)) // gibt 0 zurück, wenn die queue leer ist
            data->Handled = FALSE;        //
        else                              // wird eig in DequeueItem gesetzt, hier
            data->Handled = TRUE;         // zur Veranschaulichung
    
        return 1; // der thread hört erst auf,
                  // wenn der gesamte prozess zerstört wird.
    }
    
    VOID HandleEvent(PDATA data)
    {
       // Daten werden verarbeitet
    }
    

    Wenn ich in WorkerThreadProcedure das Sleep(1) wegmache, steigert das zwar die Performance aber die CPU ist bis zu 50% ausgelastet. Solange es da ist, ist die Auslastung kaum höher als 4%. Ich bin mir ehrlich gesagt nicht sicher wie ich auf das Sleep(1) gekommen bin ich habs irgendwo gelesen glaub ich... ganz komisch!

    Ich hoffe ihr könnt mir helfen!

    Vielen Dank im Voraus,

    gurke3000

    P.S.: Ich bin mir nicht ganz sicher mit der Kategorie meines Postings, da der Beispielcode in C ist, aber die Frage ja nicht C-explizit ist...



  • gurke3000 schrieb:

    Was ist die performanteste Methode für die Verarbeitungs-Threads den ihnen zugeordneten Queues ein Element zu entnehmen und dann zu verarbeiten?

    Die Workerthreads sollten ohne CPU-Auslastung ruhen, bis tatsächlich ein Auftrag vorhanden ist. Siehe wait und signal Konzept eines Monitors:
    http://de.wikipedia.org/wiki/Monitor_(Informatik)



  • Es ist wichtig, dass die Daten in der Reihenfolge abgearbeitet werden, in der Sie auch ankommen.

    Folgendes Problem: A1, B1 befinden sich in der Message-Queue. Nach dem Verteilen arbeitet Thread A Nachricht A1 und Thread B Nachricht B1 parallel ab. In diesem Fall kann es passieren, dass Nachricht B1 vor Nachricht A1 bearbeitet wird.



  • knivil ich vermute mal, dass die Reihenfolge beim Zurückschreiben der Ergebnisse synchronisiert werden muss und bei der eigentlichen Bearbeitung keine Rolle spielt. Falls doch, kann man die Arbeitsaufträge doch sowieso nicht parallelisieren.



  • µ schrieb:

    knivil ich vermute mal, dass die Reihenfolge beim Zurückschreiben der Ergebnisse synchronisiert werden muss und bei der eigentlichen Bearbeitung keine Rolle spielt. Falls doch, kann man die Arbeitsaufträge doch sowieso nicht parallelisieren.

    Genau. Es geht darum, dass die Daten eines Typs in der Reihenfolge in die Thread-Queues übertragen werden, in der sie, ungeachtet der Daten anderen Typs, in der Message-Queue auftauchen.

    INT WorkerThreadProcedure(LPVOID lpParam)
    {
        PDATAQUEUE queue = (PDATAQUEUE)queue;
        DATA data;
    
        while(WaitForData(&data))
        {
            HandleEvent(&data);
        }
    
        DestroyQueue(queue);
        return 0;
    }
    
    INT WaitForData(PDATAQUEUE queue, PDATA data)
    {
        //WaitForSingleObject
        if(WaitForDataAvailable(queue->DataSignal, INFINITE)) // schläft, bis daten
        {                                                     // auftauchen
            DequeueDataItem(queue, data);
        }
        return 1;
    }
    

    Effektiv werde ich dann WinAPI Events ( http://msdn.microsoft.com/en-us/library/windows/desktop/ms686915(v=vs.85).aspx ) verwenden.

    Vielen Dank für eure Hilfe!



  • Falls sich jemand hierher-googlet, hier meine Implementierung, mit minimaler CPU-Auslastung auch auf langsamen Rechnern:

    Windows API, C. Benutzt Event Objects ( http://msdn.microsoft.com/en-us/library/ms686915(v=VS.85).aspx )

    INT WorkerThreadProcedure(LPVOID lpParam)
    {
        PDATAQUEUE queue = (PDATAQUEUE)queue;
        DATA data;
    
        while(GetData(&data, queue) > 0)
        {
            HandleEvent(&data);
        }
    
        DestroyQueue(queue);
        return 0;
    }
    
    INT GetData(PDATA data, PDATAQUEUE queue)
    {
        if(WaitForSingleObject(queue->ItemAvailableEvent, INFINITE) == WAIT_OBJECT_0)
        {
            DequeueDataItem(queue, inputData);
        }
        return 1;
    }
    
    VOID HandleEvent(PDATA data)
    {
       // Daten werden verarbeitet
    }
    

    Relevanter Code der Data Queue:

    BOOL InitDataQueue(PDATAQUEUE queue)
    {
        if(queue != NULL)
        {
            queue->First = NULL;
            queue->Last = NULL;
    
            queue->ItemAvailableEvent = CreateEvent(NULL,
                                                    TRUE,
                                                    FALSE,
                                                    NULL);
            return TRUE;
        }
        return FALSE;
    }
    
    BOOL DequeueDataItem(PDATAQUEUE queue, PDATA data)
    {
        PDATAQUEUEITEM tmpItem;
    
        if(queue == NULL)
            return FALSE;
    
        if(!queue->First) // Queue ist leer
        {    
            data = NULL;
            return FALSE;
        }
    
        *data = queue->First->Data;
    
        tmpItem = queue->First;
    
        if(queue->First == queue->Last)
        {
            // es wird gerade das letzte Element entfernt,
            // keine weitern Elemente mehr in der Queue.
    
            ResetEvent(queue->ItemAvailableEvent);
    
            queue->First = NULL;
            queue->Last = NULL;
        }else
        {
            queue->First = queue->First->Next;
        }
    
        free(tmpItem);
    
        return TRUE;
    }
    
    BOOL EnqueueDataItem(PDATAQUEUE queue, DATA data)
    {
        if(queue == NULL)
            return FALSE;
    
        PDATAQUEUEITEM queueNode = (PDATAQUEUEITEM)malloc(sizeof(DATAQUEUEITEM)); 
    
        if(queueNode == NULL)
        {        
            //CRITICAL ERROR!
            return FALSE;
        }
    
        queueNode->Data = data;
    
        if(queue->First == NULL)
        {
            queue->First = queueNode;
            queue->Last  = queueNode;
        {
        else
        {
            queue->Last->Next = queueNode;
            queue->Last = queueNode;
        }
    
        queueNode->Next = NULL;    
    
        SetEvent(queue->ItemAvailableEvent);
    
        return TRUE;
    }
    


  • <offtopic>
    selbst für WinApi-Verhältnisse ist da sehr viel groß geschrieben... VOID 😕


Anmelden zum Antworten