Threadpool - effektive Anwendung bei verschiedenen Problemstellungen



  • Hallo zusammen,

    nachdem ich Probleme hatte, meine unter Windows lauffähige Game-Engine nach Linux zu portieren (wxWidgets zeigte einfach nur noch ein leeres Render-Window) und seit Projekt-Start auch eine Menge dazu gelernt habe, entschied ich mich neulich für einen sauberen Neustart. Im ersten Schritt will ich mich nun um das Ressourcen Management kümmern. Um die Prozessoren gleichmäßig auslasten zu können, habe ich folgenden Threadpool als Ausgangsbasis nachgebaut:

    http://roar11.com/2016/01/a-platform-independent-thread-pool-using-c14/

    Soweit funktioniert erst einmal alles wie erwartet. Ich kann die Queue mit beliebigen Funktionen füttern und kriege auch beliebige Rückgabeparameter heraus. Ich würde meinen ca. 90% des Codes verstanden zu haben und arbeite daran die restlichen 10% auch noch zu begreifen.Ich habe jedoch noch einige Fragen, was die effektive Anwendung des Pools angeht. Zuvor hatte ich ein wenig mit OpenMP herumgespielt und da passieren eine Menge Dinge unter der Oberfläche, um die ich mich nun selbst kümmern muss. Eine erste Frage wäre zum Beispiel folgender Fall:

    Ich habe einen riesigen std::vector A mit irgendwelchen Objekten gefüllt. Jetzt will ich einen neuen std::vector B selektiv mit Werten aus vector A füllen. Das ganze Abarbeiten, welcher Wert in den neuen Vector kommen soll, kann parallel passieren. Wie gehe ich da nun am geschicktesten heran. Ich könnte mir nun einen eigenen std::vector für jeden Thread erstellen und die Einzelvektoren dann im letzten Schritt mergen um Gegenseitiges Blocken beim Schreiben in den Ergebnisvektor zu verhindern. Hier stoße ich aber auf das Problem, dass ich nicht weiß, wie ich die notwendige Threadnummer in die Bearbeitungs-Funktion rein kriege, welche in die Queue gepackt wird. Diese brauche ich, um den richtigen Teilvektor zu selektieren. Erst wenn sich der Thread die Funktion aus der Queue fischt, ist diese aber bekannt und dann gibt es aufgrund des Funktionsinterfaces beim Aufruf (keine Parameter) keine Möglichkeit diese in die Funktion herein zu kriegen.
    Eine Alternative wäre, dass ich mir eine Struktur mit genügend vielen (z.B. numThreads), thread sicheren Subvektoren erstelle, die dafür sorgt, dass der Zielvektor zum Schreiben nach jedem Schreibrequest gewechselt wird. Nur am Ende beim Mergen muß der Prozess warten, bis alle Schreibvorgänge abgeschlossen sind. Ich habe jedoch keine Ahnung ob es da nicht elegantere Varianten gibt.

    Weiterhin ist mir nicht klar, was mit dem Ursprungsthread, der das Ganze gestartet hat in der Zwischenzeit passiert. Angenommen ich komme zu der Stelle, an der ich Vektor B erstellen will, fülle meine Queue und sage dann das Ergebnis in einen neuen Vektor kopiert/gemoved werden soll. Etwa so:

    - * irgendwelche vorherigen Berechnungen *
    - for (U32 i=0; i<vectorA.size(); ++i)
      	Threadpool.commit(BerechnungUebertrageNachB(vectorA[i]));
    - std::vector<T> B = Threadpool.commit(HoleErgebnisVectorB());
    

    Der Ursprungsthread kann ja nicht weiter machen, bis die Queue die Ergebnisfunktion verteilt hat und der ausführende Thread das Ergebnis weiter kommuniziert hat.Blockiert der Usrsprungsthread nun meine Prozessorzeit oder wird er automatisch "schlafen" gelegt? Ich schätze eher ersteres ist der Fall und ich muss mich selbst darum kümmern, den Thread ruhen zu lassen. Mache ich das mit einem unique_lock und einer std::condition_variable oder welchen weg, würdet ihr empfehlen?

    Falls jemand generell eine gute Quelle für das ANWENDEN und nicht programmieren von Threadpools hat, wäre ich sehr dankbar für Links.

    Grüße



  • Benutze TBB



  • Benutze TBB

    Ich werde es mir mal zu Gemüte führen. Ich würde mich trotzdem über Lösungsansätze und Vorschläge zu den gestellten Fragen freuen anstatt einfach nur "mach etwas anderes". Mir geht es beim Programmieren auch darum, low-level Lösungsansätze zu verstehen anstatt einfach nur irgendwelche fertigen libs zu nehmen.

    Grüße



  • Wychmaster schrieb:

    Mir geht es beim Programmieren auch darum, low-level Lösungsansätze zu verstehen anstatt einfach nur irgendwelche fertigen libs zu

    Verstehen ist sicher gut. Du scheinst die low-level Lösungsansätze aber auch zu verwenden. Davon würde ich abraten.



  • manni66 schrieb:

    Wychmaster schrieb:

    Mir geht es beim Programmieren auch darum, low-level Lösungsansätze zu verstehen anstatt einfach nur irgendwelche fertigen libs zu

    Verstehen ist sicher gut. Du scheinst die low-level Lösungsansätze aber auch zu verwenden. Davon würde ich abraten.

    Mir sind die Probleme und der Frust durchaus bekannt, die bei Eigenimplementationen von Multi-Threading auftreten können, allerdings will ich keine super-performante Spielengine mit Veröffentlichungsziel entwickeln. Ich möchte hauptsächlich die volle Kontrolle und verstehen wie und warum Dinge funktionieren. Vielleicht wechsele ich auch später einmal auf die gepostete lib, nur will ich momentan einfach mal einen Threadpool selber basteln der halbwegs funktioniert. Irgendwie kriegen die Libs es ja auch hin die genannten Probleme zu lösen und ich will halt wissen wie es geht. Ich bin mir 100% sicher dass meine Lösung ineffektiver und weniger stabil sein wird, aber es ist halt hauptsächlich ein Hobby-Lernprojekt 😉

    Wenn ich nur ein Spiel bauen wollte, könnte ich gleich eine fertige Engine nehmen 😉



  • Wär es nicht besser, wenn du die Funktionen selbst die Ergebnisse weiterleiten lässt, anstatt sie zu nem späteren Zeitpunkt zu holen?

    EDIT: Du musst auf alle warten oder?



  • 5cript schrieb:

    Wär es nicht besser, wenn du die Funktionen selbst die Ergebnisse weiterleiten lässt, anstatt sie zu nem späteren Zeitpunkt zu holen?

    Ich bin nicht so recht sicher was du meinst. Meinst du die Funktionen, die ich in der for schleife in die Queue packe? Wenn ja, dann habe ich doch das Problem, dass irgendwer die Ergebnisse noch in den Ergebnisvektor schreiben muss und mir ein großteil des Vorteils der parallelen Verarbeitung abhanden kommen würde.

    Wenn ich dich richtig verstanden habe, meinst du sowas wie

    - std::vector<T> B
    - for (U32 i=0; i<vectorA.size(); ++i)
        B.push_back(Threadpool.commit(BerechnungUebertrageNachB(vectorA[i])));
    

    wobei die Funktion nun irgendein Ergebnis zurück liefert?



  • std::vector<T> B;
    B.resize(A.size());
    for (std::size_t i=0; i<vectorA.size(); ++i) // size_t bitte
        Threadpool.commit([i, &vectorA, &vectorB](){
           vectorB[i] = Berechnung(vectorA[i]);  // NOTIZ: Berechnung = sehr aufwändig, sonst ist das hier schwachsinnig.
        });
    Threadpool.join_all(); // oder wie auch immer.
    // oder andere wartebedingung, dass Berechnung komplett.
    // b populiert.
    

    Ansonsten hab ich dich missverstanden.



  • 5cript schrieb:

    std::vector<T> B;
    B.resize(A.size());
    for (std::size_t i=0; i<vectorA.size(); ++i) // size_t bitte
        Threadpool.commit([i, &vectorA, &vectorB](){
           vectorB[i] = Berechnung(vectorA[i]);  // NOTIZ: Berechnung = sehr aufwändig, sonst ist das hier schwachsinnig.
        });
    Threadpool.join_all(); // oder wie auch immer.
    // oder andere wartebedingung, dass Berechnung komplett.
    // b populiert.
    

    Ansonsten hab ich dich missverstanden.

    Hey, ja, da war ein Misverständnis. VectorB ist eine reduzierte Variante von A. Ich will z.B. nur alle Elemente aus A haben, die eine gewisse Eigenschaft erfüllen (z.B. im sichtbaren Kamerabereich liegende Objekte). Dementsprechend muss ich einen neuen Vector anlegen und nur die Elemente rein füllen, welche die Bedingungen erfüllen. Wie gesagt, eine einfache variante wäre einfach das push_back mit einem Mutex zu kapseln, allerdings wird dann vermutlich eine lange Zeit mit Warten verbracht. Wenn jeder Thread erst einmal in seinen eigenen Vector schreibt und diese am ende zusammengeführt werden, sollte das Zeit sparen. Nur hier ist wie gesagt das Problem, wie gebe ich jedem Thread seinen eigenen Vektor?

    PS: Bin nun erst einmal für 1-2 Stunden weg vom Rechner und kann nicht antworten. Gucke nachher wieder rein

    Gruß



  • Hier noch einmal ein update, für alle die es interessiert.

    Ich habe mittlerweile eine Lösung für mein Problem ertüftelt, mit der ich leben kann und die zumindest in einer groben Testimplementierung funktioniert:

    Über std::thread::get_id() habe ich die Möglichkeit, eine einmalige Nummer zu bekommen, die unmittelbar mit meinem Thread verbunden ist. Also erstelle ich eine

    std::map<std::thread::id, std::vector<T>>
    

    , packe diese in eine Assembler-Klasse und sichere alle Zugriffe auf die map per Mutex ab. Weiterhin braucht diese Klasse 2 Counter (atomics). Einer zählt, wie viele das Ergebnis betreffende submits in die Queue des Thread-Pools gepackt wurden und ein weiterer, wie viele bereits abgearbeitet sind. Kriegt ein Thread nun einen entsprechenden Task, sucht er sich zunächst "seinen" Vector aus der map. Existiert der nicht, wird er erstellt. Nach Abarbeiten des Tasks und möglichen push_back in den zugehörigen Vector wird der Abarbeitungscounter erhöht und anschließend geguckt, ob er mit dem Counter für submitted Tasks übereinstimmt. Wenn ja, kann man sicher sein, dass keine Tasks mehr fehlen und die einzelnen Vectoren können gemerged werden.
    Die ganze Verwaltung läuft über besagte Assembler-Klasse. Man gibt nur noch eine Referenz auf den Threadpool hinein, sowie die gewünscht Arbeitsfunktion. Den Rest regelt die Klasse und spuckt am Ende nur noch den Ergebnis-Vector aus.


Anmelden zum Antworten