std::threads Restart- vs Suspend-Design



  • Hallo,

    folgende Designfrage: Angenommen, ich habe irgendein Modul, das zu undefinierten Zeitpunkten Daten produziert, die ich in einem Verarbeitsungsthread berechnen lassen will. Ist es jetzt designtechnisch besser, den Verarbeitungsthread immer wieder zu aktivieren, wenn er nicht läuft, und auslaufen zu lassen, wenn er alle Daten abgearbeitet hat, oder starte ich ihn einmal bei seiner Erzeugung und lasse ihn dann bis zum bitteren Ende in Dauerschleife laufen, egal ob Daten da sind oder nicht? In letzterem Fall sollte er natürlich nach Möglichkeit sleepen oder anderweitig Rechenzeit freigeben.

    Lösung 1 sah jetzt erst mal schöner aus, da 2) dann ja irgendwie dauernd busy-waited, aber ich hab mich jetzt mehrere Stunden durchs Web gelesen und finde nur extrem unschöne Lösungen auf die Problematiken:

    a) Wie erkenne ich, dass ein Thread im Hintergrund fertig ausgelaufen ist bzw.
    b) Wie erkenne ich, falls ein neues Datenpaket eingetroffen ist, ob der Thread von einem vorherigen noch läuft und ich das neue Paket einfach reinschieben kann, oder ob ich ihn von vorne starten, im Prinzip also neu erzeugen muss?

    Kann mir da jemand weiterhelfen?



  • Hi,

    Condition Variables sind warscheinlich eine gute Lösung für dein Problem.
    Eine Möglichkeit, Threads zu entwerfen, ist z.B. sie in einer de facto Endlosschleife laufen zu lassen, die nur verlassen wird, wenn man eine "stop"-Variable setzt (Signal zum Beenden des Threads).
    Für die zu erledigenden Aufgaben bietet sich so etwas wie eine Warteschlange an (std::queue/deque).

    Arbeitspakete sind die Objekte in der Warteschlange (zu bearbeitende Objekte, oder auch in Objekte verpackte Funktionsaufrufe - Funktoren/Lambdas).

    Ein Thread ist "ausgelaufen" wenn die Warteschlange leer ist. Dann legt er sich üblicherweise schlafen, indem er auf eine Condition Variable wartet (Signal: neue Aufgabe(n) eingetroffen).
    Wenn man von einem anderen Thread aus eine neue Aufgabe in die Warteschlange schiebt, benachrichtigt man die Condition Variable. Der schlafende Thread der auf die Variable wartet wird dann automatisch aufgeweckt.

    Grober Ablauf:

    solange stop-variable nicht gesetzt
    {
        solange daten in warteschlange
        {
           bearbeite daten
        }
        warte auf daten
    }
    

    Für jede Aufgabe einen neuen Thread zu erzeugen ist natürlich auch eine Lösung. Das hat allerdings oft ein wenig mehr Overhead, und ist nicht für jeden Anwendungsfall geeignet (eher für größere Arbeitspakete).

    Zu guter letzt: Datenstrukturen, auf die von verschiedenen Threads aus zugegriffen wird (z.B. der Queue), von denen mindestens einer schreibt, müssen natürlich synchronisiert werden (z.B. via mutex/atomic).

    Also kurz: "Dauerschleife" okay, aber bitte mit Thread schlafen legen - ist nicht nett (schlecht skalierbar) wenn er einen ganzen CPU-Kern dauerbelastet 😃

    Finnegan



  • Cool, vielen Dank, das hilft mir schon mal sehr weiter. Condition Variables hatte ich auch schon im Hinterkopf, um das Busy-Waiting zu umgehen, bin da aber nicht auf die Lösung mit dieser double loop gekommen.

    Versuch ich mal, so umzusetzen. 😉



  • Noch ne kleine Anmerkung, jetzt wo ich gerade nochmal drüberschaue:

    Das "warte auf daten" sollte eher "warte auf condition variable" heissen, und die CV natürlich nicht nur bei neuen Daten benachrichtigt werden,
    sondern z.B. auch wenn die stop-Variable gesetzt wurde - schliesslich will man den Thread ja auch auffordern, sich zu beenden, wenn grad keine Daten im Queue sind 😉

    Finnegan



  • Doch noch eine Frage dazu. Wie kann ich denn dann so was mit dieser Struktur realisieren:

    MyThread thread;
    thread.rennLos();
    
    // tu dies und das im Hauptthread
    
    if(thread.waitFor(timout_ms) == MyThread::timeout)
      // report error
    
    // worker thread ist durch, weiter gehts
    

    Mit anderen Worten möchte ich also rausfinden, ob mein Thread seine run-Methode durchlaufen hat und jetzt in der condition_variable wartet. Gibt es da eine unkomplizierte Möglichkeit, das mit timeout abzufragen oder mach ich mir dann einfach einen bool isRunning in meiner Thread-Klasse, die vor der condition_variable und beim wakeup entsprechend gesetzt wird?



  • Ich mag mich irren, aber für mich hört sich das nach einem klassischen Single-Producer Single-Consumer Problem an. Kannst du nicht eine Queue nehmen in der einer die Daten reinschiebt und dann gegebenenfalls den anderen an der Condition Variable aufweckt der das dann aus der queue raus nimmt? Z.B. so https://codereview.stackexchange.com/questions/84109/a-multi-threaded-producer-consumer-with-c11



  • Du könntest eine andere Condition Variable verwenden, auf die der Hauptthread wartet, und welche dann von MyThread benachrichtigt wird.
    Das letzte mal als ich so etwas in der Richtung umgesetzt habe, hing diese CV dann allerdings an den Arbeitspaketen selbst, so dass ich explizit
    auf ein ganz spezielles Arbeitspaket warten konnte (kann ja sein, dass der Arbeitsthread mit einem anderen Paket durch ist, wenn ich die
    Benachrichtigung erhalte, und nicht mit dem, dessen Ergebnis ich benötige).

    Den Timeout bekommt man mit std::condition_variable::wait_for/wait_until hin.

    So ein Design geht allerdings stark in Richtung dessen, was man mit std::async/std::future machen kann (mit denen habe ich mich selbst noch nicht so intensiv beschäftigt).
    Es könnte sich also lohnen sich die mal anzusehen, bevor man etwas eigenes implementiert.

    Finnegan



  • Ja, das stimmt. async und future haben vorerst auch genau nach dem ausgesehen, was ich gesucht habe. Hab dann aber leider feststellen müssen, dass damit leider keine Thread Priorities vergeben kann. 😞



  • Hallo,

    ich noch mal. Nächstes Problem, mein erster Design-Versucht sieht ganz grob so aus:

    MyThread::MyThread()
    {
      _thread = std::thread( &MyThread::mainLoop, this );
    }
    
    void MyThread::mainLoop()
    {
      while( !_stop )
      {
        std::unique_lock<std::mutex> lock( _mtx );
        run();
        _cv.wait( lock );
      }
    }
    

    Die run-Methode ist nun rein virtuell und soll in abgeleiteten Klassen überschrieben werden. Jetzt hab ich aber das Problem, dass ich in einem dieser Konstruktoren folgendes mache:

    AbgeleiteterThread::AbgeleiteterThread()
      : MyThread()
    
    void run()
    {
      // whatever
    }
    

    Beim binden an den std::thread im MyThread-Konstruktor wird dann aber die ganze Polymorphie-Info weggesliced und wirklich nur der MyThread-Anteil der Instanz gebunden. In der MainLoop kriege ich dann eine exception: pure virtual function call. Wie kann ich das umgehen? Die Brechstangen-Methode wäre vermutlich, noch mal eine Referenz auf die abgeleitete Klasse im Konstruktor mitzugeben, damit ich beim Binden an die runtime-Info komme. Aber schön ist was anderes und es schreit eher nach einer eleganteren Lösung. 😉



  • Das Problem ist, dass der Thread bereits gestartet ist, wenn das Objekt noch nicht fertig initialisiert ist. Du hast also eine race condition beim this-pointer.

    Lass doch einfach den Thread extern starten, z.B. mit einer funktion runThreaded.



  • banshee schrieb:

    Hallo,

    ich noch mal. Nächstes Problem, mein erster Design-Versucht sieht ganz grob so aus:

    MyThread::MyThread()
    {
      _thread = std::thread( &MyThread::mainLoop, this );
    }
    
    void MyThread::mainLoop()
    {
      while( !_stop )
      {
        std::unique_lock<std::mutex> lock( _mtx );
        run();
        _cv.wait( lock );
      }
    }
    

    Ich sag mal 👍 dafür dass du versuchst Code-Duplizierung zu vermeiden.
    Allerdings gehst du es nicht gut an. Du hast dir hier ein Muster ausgesucht auf das schon viele vor dir gekommen sind, das aber nicht wirklich gut ist. Auch ganz abgesehen von dem Problem mit der virtuellen run Funktion.

    Eine Abstraktion die dagegen oft sehr gut anwendbar ist sind Queues. Stichwort "Producer Consumer Queue" oder auch "Work Queue".

    Grob skizziert könnte das so aussehen

    void DoWork(WorkQueue& queue) // kann man natürlich auch als Memberfunktion der Queue machen
    {
        while (!queue.IsShutDown())
        {
            auto wi = queue.GetWorkItem(); // blockiert bis es was zu tun gibt oder die Queue "runtergefahren" wird
            wi.Execute();
        }
    }
    
    ...
    {
        WorkQueue queue;
        std::thread workerThread(&DoWork, std::ref(queue));
    
        // Arbeit "in Auftrag geben"
        auto waitHandle = queue.AddWorkItem(...);
    
        // Tu dies und das im Hauptthread
    
        // Warten bis Arbeit fertig durchgeführt
        if (!waitHandle.TimedWait(someTimeout))
            abort();
    
        // Work Item ist fertig abgearbeitet
    
        // Shutdown:
        queue.Shutdown();     // Führt dazu dass der Thread ein "dummy work item" von GetWorkItem() bekommt und queue.IsShutDown() danach "true" meldet => Thread beendet sich also "bald mal"
        workerThread.join();  // Wartet darauf dass der Thread sich wirklich beendet hat
    }
    

    Im Prinzip recht ähnlich zu std::promise/future , nur halt selbstgestrickt und so dass man kontrollieren kann wo Work-Items abgearbeitet werden.


Anmelden zum Antworten