Threads und Barrier



  • Hallo Leute,

    ich schreibe zur Zeit an einem Programm das mit threads (pthreads) arbeitet. Ich muss nun an mehreren Stellen im Code dafür sorgen, dass die threads dort warten, bis alle diesen Punkt erreicht haben. Also einen klassischen Barrier. Ich habe als Barrier-Funktion in meiner Thread-Klasse folgendes implementiert:

    void Thread::barrier()
    {
        m_kBarrier.kMutex.lock();
        m_kBarrier.nWaitingFor--;
        Tsleep(TTimeVal(0,1000));
        while (m_kBarrier.nWaitingFor > 0)
        {
            m_kBarrier.kCondition.Wait(m_kBarrier.kMutex);
        }
        m_kBarrier.kCondition.Signal();
        m_kBarrier.kMutex.unlock();
    }
    

    Die Klasse Thread hat also ein struct inne, dass die Variablen für den Barrier beinhaltet. Das Struktum sieht so aus:

    struct Barrier
    {
        Mutex     kMutex;
        Condition kCondition;
        int       nWaitingFor;
        int       nTeamMembers;    // zur Zeit noch nicht benötigt
    };
    

    Nun die Frage:
    Wie kann ich dafür sorgen, dass die Anzahl der Threads auf die zu warten ist, wieder auf den Ursprungswert zurück gesetzt wird, aber nur EINMAL?

    Danke für eure Hilfe
    Gruß Mea

    PS: Ach ja, Mutex, Condition sind genau wie Thread Wrapper um die Posix Aufrufe.



  • Hallo Leute!

    Selbst überlegen führt doch meist zum Erfolg! 😃
    Für alle die es interessiert hier also der korrekte Code, um das Barrier Problem zu lösen.

    void Thread::barrier()
    {
        m_kBarrier.kMutex.lock();
        m_kBarrier.nWaitingFor--;
    
        // check if this is the last thread to enter the barrier
        // if so notify all other threads to go on
        if (m_kBarrier.nWaitingFor == 0)
        {
            // reset the counter to the number of threads participating
            m_kBarrier.nWaitingFor = m_kBarrier.nTeamMembers;
            m_kBarrier.kCondition.Broadcast();
        }
        else    // otherwise sleep until all threads have entered the barrier
        {
            m_kBarrier.kCondition.Wait(m_kBarrier.kMutex);
        }
        m_kBarrier.kMutex.unlock();
    }
    

    Gruß Mea



  • pthread_barrier_* ist nichts für dich?

    http://www.opengroup.org/onlinepubs/009695399/functions/pthread_barrier_init.html

    und dann pthread_barrier_wait.



  • Hi Ponto,

    danke für die Idee. Ich habe sie eben ausprobiert und bin voll auf die Schnauze gefallen. Das klappt damit nicht. Ein Thread wird schlafen gelegt, der andere nicht aber manchmal doch ... Scheint alles noch nicht so ganz ausgereift zu sein.
    Komischerweise funktioniert alles wunderbar, wenn ich das auf einer 1-Prozessor Maschine laufen lasse. Sobald aber eine SMP-Maschine ins Spiel kommt knallt es.
    (Übrigens auch mit meiner Methode, nur nicht so schlimm :D)
    Ich werde dann wohl wieder auf "Aktives Warten" (AAARGGGGHHHH) zurück greifen müssen.

    Hast du Erfahrung mit der pthread_barrier Implementierung? Wäre nett, wenn du mir dann weiterhelfen könntest.

    Gruß Mea



  • Meaculpa schrieb:

    Hi Ponto,

    danke für die Idee. Ich habe sie eben ausprobiert und bin voll auf die Schnauze gefallen. Das klappt damit nicht. Ein Thread wird schlafen gelegt, der andere nicht aber manchmal doch ... Scheint alles noch nicht so ganz ausgereift zu sein.
    Komischerweise funktioniert alles wunderbar, wenn ich das auf einer 1-Prozessor Maschine laufen lasse. Sobald aber eine SMP-Maschine ins Spiel kommt knallt es.
    (Übrigens auch mit meiner Methode, nur nicht so schlimm :D)
    Ich werde dann wohl wieder auf "Aktives Warten" (AAARGGGGHHHH) zurück greifen müssen.

    Hast du Erfahrung mit der pthread_barrier Implementierung? Wäre nett, wenn du mir dann weiterhelfen könntest.

    Gruß Mea

    Hab keine Erfahrungen, aber du könntest mal mit einem kleinen Programm zeigen, wie du dich auf die Schnauze legst. Vielleicht findet man da noch einen Fehler.



  • OK, der Post wird jetzt etwas länger 😃

    Der Barrier ist ja oben beschrieben.

    Hier eine Funktion die diesen Barrier benutzt:

    // Die Klasse SORClassic ist von Thread abgeleitet. Sie hat also die Möglichkeit
    // einen neuen Thread zu erzeugen und den Barrier zu benutzen.
    
    // SORClassic::Iterations -----------------------------------------------------
    
    TTimeVal SORClassic::Iterations(int nThreadID)
    {
        double dError, dEquations;
    
        int nCount = 0;
    
        TTimer kTimer;
        kTimer.start();
    
        do
        {
            // reset the variables for the break condition
            m_dError = 0.0;
            m_dEquations = 0.0;
    
            CalculateRed(nThreadID);
            if (m_bParallel)
            {
                barrier();        // <-- Hier ein barrier
            }
    
            CalculateBlack(nThreadID);
            if (m_bParallel)
            {
                barrier();        // <-- Hier auch
            }
    
            nCount++;
            if ((nCount%100) == 0)
            {
                dError = Residual(m_kEquations, m_kSourceTerm, m_kError, 0);
                dEquations = m_kEquations.FrobeniusNormSquared(0);
    
                m_kMutex.lock();
                m_dError += dError;
                m_dEquations += dEquations;
                m_kMutex.unlock();
    
                if (m_bParallel)
                {
                    barrier();    // <-- und alle 100 Iterationen hier auch
                }
    
                if (nThreadID == 0)
                {
                    m_dResidual = sqrt(m_dError / m_dEquations);
                }
            }
        }while ((m_dResidual > m_dEpsilon) && (nCount < m_nNumIterations));
    
        TTimeVal kTimeElapsed = kTimer.elapsed();
    
        return kTimeElapsed;
    }
    

    Der Master und der Slave rufen jetzt jeweils diese Methode auf. Der Slave ist eine "echter" Thread, der Master das Hauptprogramm (also ein Prozess - das ist dem Barrier aber egal).
    Für eine angenommen Matrixgröße von 100x100 Elementen braucht man hier 1900 Iterationen, wenn man das seriell macht. Die selbe Anzahl an Iterationen muss auch die Thread-Parallelisierte Version brauchen, sie ist ja nur schneller (MFlops-seriell < MFlops-Threadparallel).
    Auf einer 1-Prozessor Maschine funktioniert das ganze wunderbar. Aber auf einer SMP-Maschine behauptet das Programm nach einer beliebigen Anzahl Iterationen, die kleiner ist als die der benötigten, fertig zu sein. Die Zahl variiert auch von Programmstart zu Programmstart.
    D.h. da stimmt was nicht mit der Synchronisation...

    Tja, nun steh ich da ich armer Tor...

    Gruß Mea

    [EDIT]
    Die Methode wird mit der ID des Threads aufgerufen: Master -> ID=0, Slave ID=1
    [/EDIT]



  • Hallo,

    Ich hab' den Fehler gefunden. Das Problem war folgendes:

    // SORClassic::Iterations -----------------------------------------------------
    
    TTimeVal SORClassic::Iterations(int nThreadID)
    {
        double dError, dEquations;
    
        int nCount = 0;
    
        TTimer kTimer;
        kTimer.start();
    
        do
        {
            // Das Zurücksetzen der Variablen darf hier nicht geschehen, da auf
            // einer SMP-Maschine nicht klar ist, wer dort zuerst ankommt.
            // Daher errechnet sich die Variable m_dResidual zu einem unbestimmten
            // Zeitpunkt aus 0.0/0.0, was in einem "nan" endet und somit kleiner
            // (weil nicht bestimmt) als m_dEpsilon ist. Das Programm glaubt es sei
            // fertig ...
    //        m_dError = 0.0;
    //        m_dEquations = 0.0;
    
            // ...
    
            if ((nCount%100) == 0)
            {
                // ...
    
                if (nThreadID == 0)
                {
                    m_dResidual = sqrt(m_dError / m_dEquations);
                    // Das Zurücksetzen muss hier geschehen, wenn die Variable
                    // errechnet ist und auch nur von einem Thread, da der andere
                    // dann noch die alten Werte inne hat und nicht glaubt er sei
                    // fertig ... 
                    m_dError = 0.0;
                    m_dEquations = 0.0;
                }
            }
        }while ((m_dResidual > m_dEpsilon) && (nCount < m_nNumIterations));
    
        // ...
    
    }
    

    Danke für die Hilfe ...

    Gruß Mea



  • Hmm, ein vollständiges Programm wäre besser. Da könnte man auch mal was probieren.

    Ansonsten hab ich mir mal den barrier angeschaut und die Benutzung der Condition scheint mir fehlerhaft. Ohne ein vollständiges Programm kann ich das aber nicht abschliessend beurteilen. Ich würde es eher so machen:

    void Thread::barrier() 
     { 
         m_kBarrier.kMutex.lock(); 
         m_kBarrier.nWaitingFor--; 
    
         std::size_t event = m_kBarrier.event;
    
         // check if this is the last thread to enter the barrier 
         // if so notify all other threads to go on 
         if (m_kBarrier.nWaitingFor == 0) 
         { 
             // reset the counter to the number of threads participating 
             m_kBarrier.nWaitingFor = m_kBarrier.nTeamMembers; 
             ++m_kBarrier.event;
             m_kBarrier.kCondition.Broadcast(); 
         } 
         else    // otherwise sleep until all threads have entered the barrier 
         {
            while (event == m_kBarrier.event)
               m_kBarrier.kCondition.Wait(m_kBarrier.kMutex); 
         } 
         m_kBarrier.kMutex.unlock(); 
     }
    

    Die Enführung von m_kBarrier.event ist notwendig, da der vorherige Code sich nicht gegen das gelegentliche Aufwachen aus dem pthread_condition_wait schützt.



  • Was meinst du mit gelegentlichem Aufwachen?
    Etwa wenn ein anderer Thread als der, der es soll ein Signal sendet?

    Ich geh' jetzt mal schlafen. War ein langer (aber erfolgreicher) Tag.

    Gruß und Gute Nacht
    Mea



  • Meaculpa schrieb:

    Was meinst du mit gelegentlichem Aufwachen?
    Etwa wenn ein anderer Thread als der, der es soll ein Signal sendet?

    Ich geh' jetzt mal schlafen. War ein langer (aber erfolgreicher) Tag.

    Gruß und Gute Nacht
    Mea

    Ein Thread kann aus einem pthread_cond_wait wieder zurückkehren, ohne dass ein Signal gesendet wurde. Die Bedingung, auf die gewartet wurde, muss dann dementsprechend auch nicht erfüllt sein. Das Idiom, das man normalerweise verwendet sieht so aus:

    while (not condition) {
       pthread_cond_wait(cond, mutex);
    }
    

    Das sollte aber in jedem Text zu Condition Variablen erklärt werden.


Anmelden zum Antworten