Threads vom Typ B sollen auf Threads vom Typ A warten (oder so)



  • Hallo,

    ich habe folgendes Problem: Von meinem Hauptthread aus, werden mehrere Threads gestartet und detached (mich interessiert der Rückgabewert idR. nicht).
    Diese Threads arbeiten mit einer Datenbank im Hintergrund und werden, wenn sie soweit sind, mit dem Hauptthread synchronisiert.
    Nun kann es aber passieren, dass zwei Threads gestartet werden, die auf Daten vom jeweils anderen angewiesen sind:

    Thread A lädt "MID", benötigt dafür bereits bekannte "UID"s.
    Thread B lädt ein "UID", benötigt dafür jedoch "MID".

    Thread A kann durchaus mehrfach hintereinander gestartet werden, allerdings stellt A sicher, dass stets das aktuellste "MID" zum Hauptthread geschrieben wird. A sollte außerdem schnellstmöglichst fertig werden. Wenn ein A startet wird das bisherige MID invalidiert.

    Thread B kann ebenfalls mehrfach gestartet werden, benötigt allerdings das aktuellste "MID" aus Thread A, falls ein A existiert, selbst wenn A nach B gestartet ist.

    Daher habe ich mir überlegt; Jedes B sollte warten, falls ein A läuft, bis alle As ihr MID geladen haben. Dann sollte das jeweils aktuellste MID an alle Bs übergeben werden oder die B-Threads holen sich den Wert aus dem Hauptthread.

    Meine Frage ist nun: Wie kann ich das mit C++11 machen, bzw. hat jemand eine andere Idee wie man das Problem lösen kann? Ich möchte das möglichst ohne große Abhängigkeit (wie zB boost) umsetzen.
    Lauffähig sollte das unter Windows 7 (VS2015) und Debian/Linux sein.

    Viele Grüße,
    TacTic



  • Schau dir mal std::condition_variable an... 😉



  • Hallo,

    danke für den Tipp, ich hab mich in der Zwischenzeit etwas eingelesen und herumprobiert, allerdings kann nicht nicht wirklich sagen, dass ich alles komplett verstanden habe.
    Mein Code zum Testen ist dieser hier, ich hoffe das ist nicht zu viel:

    #include <iostream>
    #include <chrono>
    #include <mutex>
    #include <condition_variable>
    #include <thread>
    #include <cassert>
    
    using namespace std::chrono;
    using std::cout; using std::endl;
    
    std::mutex m_CondMutex;
    std::condition_variable m_CondVar;
    
    std::mutex m_Mutex; //Soll m_MID und m_Token schützen
    int64_t m_MID = 0;
    unsigned m_Token = 0;
    
    void SetMID(const int64_t &t)
    {
        std::unique_lock<std::mutex> lk(m_Mutex);
        m_MID = t;
        if(t < 0)
            m_Token++;
    }
    
    int64_t GetMID()
    {
        std::unique_lock<std::mutex> lk(m_Mutex);
        return m_MID;
    }
    
    unsigned GetToken()
    {
        std::unique_lock<std::mutex> lk(m_Mutex);
        return m_Token;
    }
    
    class Workers {
        virtual void Work() = 0;
    protected:
        unsigned m_T;
        int m_Nr; // Soll hier jetzt das neuste MID sein
    public:
        Workers(unsigned t) : m_T(t) {}
        static void Run(Workers *pW, int Nr) {
        cout << "Start Thread Nr." << Nr << endl;
        pW->m_Nr = Nr;
        pW->Work();
        delete pW;
        cout << "Ende Thread Nr. " << Nr << endl;
        }
    };
    
    class ThreadA : public Workers //Lädt MID
    {
        virtual void Work()
        {
            std::unique_lock<std::mutex> lk(m_CondMutex);
            std::this_thread::sleep_for(1s); //Arbeiten simulieren, MID aus DB laden
    
            if(GetToken() == m_T)
            {
                SetMID(m_Nr);
                cout << "MID auf " << m_Nr << " gesetzt" << endl;
                m_CondVar.notify_all();
            }
            else
                cout << "MID NICHT auf " << m_Nr << " gesetzt" << endl;
        }
    public:
        ThreadA(unsigned t) : Workers(t) {}
    };
    
    class ThreadB : public Workers // Neuste MID verwenden
    {
        virtual void Work()
        {
            std::unique_lock<std::mutex> lk(m_CondMutex);
    
            cout << "Vor-MID: " << GetMID() << endl;
    
            std::this_thread::sleep_for(500ms); //Arbeiten simulieren
    
            m_CondVar.wait(lk, [&](){return GetMID() != -1;});
            cout << "Nach-MID: " << GetMID() << endl;
        }
    public:
        ThreadB(unsigned t) : Workers(t) {}
    };
    
    int main(void)
    {
        cout << "MID ist jetzt: " << m_MID << endl;
    
        cout << "B und A starten" << endl;
        for(int i = 0; i < 5; i++)
        {
            //Bevor ein A startet wird MID auf -1 gesetzt und unser Token inkrementiert
            SetMID(-1);
            auto pA = new ThreadA(GetToken());
            std::thread(Workers::Run, pA, i).detach();
        }
    
        for(int i = 4; i < 8; i++)
        {
            auto pB = new ThreadB(GetToken());
            std::thread(Workers::Run, pB, i).detach();
        }
    
        std::this_thread::sleep_for(7s); //Einfaches warten dass die Threads fertig werden
        assert(GetMID() == 4);
    
        SetMID(GetMID()+1);
        cout << "B ohne A, MID=" << GetMID() << endl;
        auto t = std::thread(Workers::Run, new ThreadB(GetToken()), 8);
        t.join();
    
        return 0;
    }
    

    Kurze Erklärung zum Code:
    Workers ist die gemeinsame Oberklasse aller Threads und führt die eigentliche Operation aus und gibt den Speicher wieder frei. Jeder Thread erhält hier außerdem eine aufsteigende Nr. die der Einfachkeit halber auch dem Token entspricht. ThreadA setzt MID auf seine Nr. falls der momentane Token seiner "Thread-Nr" entspricht. ThreadB soll, falls ein A existiert bzw. MID ungültig ist warten.

    Von der Ausgabe her scheint es zumindest so zu funktionieren wie ich es mir gedacht habe, allerdings weiß ich nicht ob das nun einfach Zufall ist oder es tatsächlich richtig angewendet wurde.
    Insbesondere m_ConMutex, welches ja in beiden Work()-Methoden gleich am Anfang erworben wird, blockt m.M.n. ThreadA, falls B zuerst mit Arbeiten anfängt und dann wird auch nur ein A nach der Reihe ausgeführt. Alle As sollte ja aber möglichst ungehindert laufen. Aber wie müsste ich das Programm dann dafür verändern?


Anmelden zum Antworten