Meine Thread Klasse (bitte mal testen)



  • Plassy schrieb:

    Simon2 schrieb:

    Plassy schrieb:

    ...Ich habe Das jetzt so implementiert, dass es ohne mutexes und system-lock Aufrufe auskommt...

    Das wird lustig !

    Gruß,

    Simon2.

    Deswegen ist es ja auch in einer Klasse gekapselt.

    Sag' ich ja: Wenn Du meinst "Klasse kapseln" hätte irgendeinen "threadtechnischen Effekt" (und würde gar Mutexes oder "System locks" überflüssig machen") .... wird's lustig.

    Das ist so als würdest Du sagen: "Mein Auto kann gar nicht gegen den Baum fahren, weil ja mein Gepäck im Koffer und nicht lose auf den Sitzen liegt"...

    Gruß,

    Simon2.



  • asc schrieb:

    Nicht nur theoretisch, sondern auch praktisch kann dabei was schief gehen. Und je nach Belastung wird es auch realistischer, Thread 1 prüft den boolwert / Threadsprung / Thread 2 schreibt boolwert. Und schon hast du das Problem.

    Die Behauptung war, dass während des Schreibens etwas schiefgehen kann. Nicht zwischen Prüfen und Schreiben.

    Kann meines erachtens in der Zeile "a = true" mehr als eine Instruktion verbraten werden (Sorry, ich hatte vor langer Zeit Assembler und keine Ahnung ob das so unbedingt stimmt):

    1. Speicherzelle in das Register laden
    2. Register ändern
    3. Registerinhalt in Speicher zurückschreiben

    Schritt 1 ist unnötig (sonst hätten wir keine reine Schreiboperation). Das Register ist Thread-lokal, also kann von 2 nach 3 auch nicht schiefgehen.



  • Eine Frage hätte ich da aber noch.
    Dass es beim Zugriff von geminsam genutzten Daten von zwei threads zu problemen kommen kann ist klar.
    Wie sieht es aber mit Funktionsaufrufen aus?

    Wenn man jetzt so eine Funktion hat:

    int calcSomething(int p1, int p2)
    {
        return (p1 + p2) / 5;
    }
    

    Die funktion benutzt nur die Daten die Sie über den Stack bekommen hat und
    keine gemeinsam genutzten Daten.
    Angenommen zwei threads wollen diese funkttion gleichzeitig aufrufen.
    Kann schon allein der gleichzeitige Funktionsaufruf zu Problemen führen?

    MfG
    Plassy



  • Plassy schrieb:

    Die funktion benutzt nur die Daten die Sie über den Stack bekommen hat und
    keine gemeinsam genutzten Daten.
    Angenommen zwei threads wollen diese funkttion gleichzeitig aufrufen.
    Kann schon allein der gleichzeitige Funktionsaufruf zu Problemen führen?

    Ne, zum Glück nicht, jeder Thread hat seinen eigenen Stack, wo ihm niemand dazwischenfunken darf 😉



  • So,
    ich hab das jetzt mal alles umgeschrieben.
    Ich hoffe Das ist so besser...??

    #include <queue>
    #include <boost/function.hpp>
    #include <boost/thread.hpp>
    #include <boost/thread/mutex.hpp>
    #include <boost/thread/condition.hpp>
    
    namespace Threads
    {
    
    class Thread
    {
    public:
    	Thread() :
    		mStopThread(false)                                 ,
    		mThread(boost::bind(&Thread::threadFunction, this))
    	{
    	}
    
    	virtual ~Thread()
    	{
    		stop();
    		mThread.join();
    	}
    
    	void enqueueTask(const boost::function0<void>& task)
    	{
    		boost::mutex::scoped_lock lock(mMutex);
    		mTasks.push(task);
    		mCond.notify_one();
    	}
    
    	void clear()
    	{
    		boost::mutex::scoped_lock lock(mMutex);
    
    		while(not mTasks.empty())
    			mTasks.pop();
    
    		mCond.notify_one();
    	}
    
    	void stop()
    	{
    		boost::mutex::scoped_lock lock(mMutex);
    		mStopThread = true;
    		mCond.notify_one();
    	}
    
    	size_t getNumTasks() const
    	{
    		boost::mutex::scoped_lock lock(mMutex);
    		size_t size = mTasks.size();
    		mCond.notify_one();
    		return size;
    	}
    
    protected:
    	virtual void taskStarts(){ }
    	virtual void taskEnds  (){ }
    
    private:
    	void threadFunction()
    	{
    		while(true)
    		{
    			boost::mutex::scoped_lock lock(mMutex);
    
    			while(true)
    			{
    				if(mStopThread)
    					return;
    
    				if(mTasks.empty())
    					mCond.wait(lock);
    				else
    					break;
    			}
    
    			boost::function0<void> task = mTasks.front();
    			mTasks.pop();
    			lock.unlock();
    
    			taskStarts();
    			task();
    			taskEnds();
    		}
    	}
    
    private:
    	mutable boost::mutex                 mMutex;
    	mutable boost::condition             mCond;
    	std::queue< boost::function0<void> > mTasks;
    	volatile bool                        mStopThread;
    	boost::thread                        mThread;
    };
    

    MfG
    Plassy



  • Bashar schrieb:

    ProgChild schrieb:

    Wärend einer der bool Variablen verändert wird, kann ein Threadwechsel auftreten. Dann der Wert in der Boolvariable undefiniert.

    Aber nur theoretisch. Ein bool ist ein Byte, das schreibt man in einer Instruktion, da kann also kein Threadwechsel dazwischenfunken.

    Was meine andere Argumentation allerdings nicht widerlegt. Außerdem wird ein Prozessor-Befehl auf mehrere Teilbefehle aufgeteilt. Befehl auslesen, dekodieren, zubetrachtenden Daten laden, usw. Weist du, dass jede Prozessarchitektur keinen Threadwechsel wärend dessen vornehmen kann?



  • @Plassy:
    Hmpf. Hiermit offiziell eine Entschuldigung an dich... also: sorry 🙂
    Ich war wohl etwas schlecht drauf und hab's denke ich diesmal wirklich etwas übertrieben. Gottseidank hast du's anscheinend mit Humor genommen.

    Was das "falsch" angeht - wurde glaube ich schon ganz gut erklärt.

    Trotzdem nochmal von mir ein paar grundlegende Dinge. Der 1. Grund warum es mit volatile bool nicht funktioniert ist im Prinzip relativ einfach. Ziemlich egal was man machen möchte, sofern es um Synchronisierung geht, braucht man immer einen "testen und setzen" Befehl der komplett unteilbar (atomic) arbeitet.

    Angenommen man hat eine Queue wo man in einem Thread sachen reinstecken möchte, und im anderen Thread auslesen. Man möchte nun diese Queue über ein einfaches "wird verwendet" Flag synchronisieren. Der Code dazu müsste ca. wie folgt aussehen:

    // ACHTUNG: das geht so _NICHT_, dient nur zur Veranschaulichung :)
    class Queue
    {
    public:
        void PushQueue(T t)
        {
            while (!test_and_set(&m_queueLocked, false, true));
    
            // ... access queue ...
    
            m_queueLocked = false;
        }
    
        void PopQueue(T& t)
        {
            while (!test_and_set(&m_queueLocked, false, true));
    
            // ... access queue ...
    
            m_queueLocked = false;
        }
    
        static bool test_and_set(bool volatile* x, bool expected, bool new_value)
        {
            // ACHTUNG: all dies müsste atomar, also unteilbar erfolgen:
            if (*x == expected)
            {
                *x = new_value;
                return true;
            }
            else
                return false;
        }
    
    public:
        bool volatile m_queueLocked;
    };
    

    Angenommen 2 Thread führen nun "gleichzeitig" PushQueue und PopQueue aus. Weil's einfacher ist sagen wir das System hat nur eine CPU, also werden die beiden Threads abwechselnd drankommen.
    Anfangs ist m_queueLocked false, der erste Thread (1) läuft also über das "if" in test_and_set drüber.
    Genau an dem Punkt wird aber nun der Ablauf unterbrochen, und Thread (2) kommt dran.
    m_queueLocked ist immer noch false, also läuft auch der 2. Thread über das "if" drüber, und setzt danach m_queueLocked auf true, gibt "true" zurück, und die Warteschleife wird abgebrochen.

    Danach läuft der 1. Thread wieder los. Da er aber schon über das "if" drübergelaufen war setzt er ebenfalls (nochmal) m_queueLocked auf true, gibt ebenso true zurück, und die Warteschleife wird wieder abgebrochen.

    -> Problem.

    (Das Problem kann theoretisch genauso auf 2 Cores auftreten, nur ist es einfacher vorzustellen wenn man annimmt es sei ein Core der abwechselnd die Threads ausführt)

    Wenn man nun so eine atomare "test_and_set" Funktion hat sind damit aber noch nicht notwändigerweise alle Probleme beseitigt. Auf einigen CPUs kann es nämlich vorkommen dass in Multi-CPU (oder Multi-Core) Systemen, Änderungen die eine CPU im Speicher gemacht hat nicht sofort für andere CPUs "sichtbar" werden -- die anderen CPUs würden dort also alte Werte lesen.
    Wenn nun gemeinsam verwendete Daten modifiziert werden muss aber sichergestellt werden dass wer auch immer als nächstes darauf zugreift auch die aktuellsten Daten sieht. Um das sicherzustellen gibt es wieder spezielle Befehle, und vom OS oder Libraries zur Verfügung gestellte Mutexen verwenden diese speziellen Befehle damit alles so läuft wie es sollte.

    Schliesslich gibt es noch eine 3. Sache die einem dazwischenfunken kann, und das ist das "reordering" welches viele CPUs intern machen und auch der Compiler gerne macht. Also das "verschieben" von Befehlen (um bessere Performance zu erreichen). Damit alles so funktioniert wie es funktionieren sollte dürfen gewisse Befehle aber nicht ihre Position gegeneinander tauschen, z.B. das Schreiben eines gemeinsam verwendeten Datenwertes muss VOR dem freigeben des Locks erfolgen - sonst könnte ein 2. Thread zu früh auf die gemeinsam verwendeten Daten zugreifen, und wieder mit einem alten Wert arbeiten. Auch um das kümmern sich "fertige" (OS, Library) Mutexen, so dass man sich keinen Kopf mehr machen muss wenn man diese verwendet.

    ----

    Da das ein verflixt schwieriges und vielschichtiges Thema ist, ist es am einfachsten und besten wenn man eben fertige Mutexen verwendet, und nicht versucht es selbst zu programmieren. (Entweder müsste man sonst sehr viel Zeit investieren alles nötige in Erfahrung zu bringen um ein fehlerfreies Programm zu schreiben, oder man würde notwendigerweise Fehler machen.)



  • hustbaer schrieb:

    @Plassy:
    Hmpf. Hiermit offiziell eine Entschuldigung an dich...

    angenommen 😃

    MfG
    Plassy



  • ProgChild schrieb:

    Außerdem wird ein Prozessor-Befehl auf mehrere Teilbefehle aufgeteilt. Befehl auslesen, dekodieren, zubetrachtenden Daten laden, usw. Weist du, dass jede Prozessarchitektur keinen Threadwechsel wärend dessen vornehmen kann?

    Entscheidend ist, dass es am Ende nur ein Schreibzugriff ist. Wie der Befehl intern zerlegt wird ist uninteressant.



  • Bashar schrieb:

    Entscheidend ist, dass es am Ende nur ein Schreibzugriff ist. Wie der Befehl intern zerlegt wird ist uninteressant.

    Bei x86 sollte das wohl stimmen. Jetzt gibt es aber Prozessoren, die können nur Double-Words addressieren. d.h. Ein bool würde auf 4Byte erweitert. Jetzt müssten vier Byte geschrieben werden...

    Das wird wohl in der Praxis fast nicht vorkommen. Was ich sagen will: Einen Mutex zu verwenden ist doch einfach. Warum nicht einfach das ganze sauber Programmieren und damit Protabilität erhalten?



  • Plassy schrieb:

    @Werner Salomon

    Hey danke, Das sieht wirklich gut aus. ..

    Hallo Passy,

    das freut mich, wenn Dir das gefällt.

    Plassy schrieb:

    Was mich aber auf die Idee gebracht hat das ohne system-lock Befehle
    zu realisieren war dieser Artikel:

    Lock-free Interprocess Communication
    http://www.ddj.com/cpp/189401457

    Ich habe das Prinzip dann so abgeändert, dass es zu meinem Problem passt.
    Aber vielleicht habe ich auch was falsch verstanden??

    Das ist ein interessanter Artikel. Aber am Ende sagt der Autor ja selbst, dass es bei single-core nichts bringt. Ich bin so gar der Meinung, dass es schädlich sein kann. Ich habe dafür mal mein ActiveObject auf das in dem Artikel beschriebene Prinzip umgestrickt.

    #include <iostream>
    #include <vector>
    #include <boost/thread/thread.hpp>
    #include <boost/bind.hpp>
    #include <boost/function.hpp>
    
    using std::cout;
    using std::endl;
    
    class ActiveObject
    {
    public:
        typedef boost::function0< void > task_type;
    
        ActiveObject()
            : m_sync_flag( N, false )
            , m_tasks( N )
            , m_wrtIdx( 0 )
            , m_rdIdx( 0 )
            , m_running( true )
    
            , m_thrd( boost::bind( &ActiveObject::run, this ) )
        {}
        ~ActiveObject()
        {
            Cmd( boost::bind( &ActiveObject::stop, this ) ); // Ende
            m_thrd.join();
        }
    
        void Cmd( const task_type& task )
        {
            while( m_sync_flag[m_wrtIdx] )
                boost::thread::yield();  // thread in ready state
            m_tasks[m_wrtIdx] = task;
            m_sync_flag[m_wrtIdx] = true; // Signal: Wert ist geschrieben
            if( ++m_wrtIdx == N )
                m_wrtIdx = 0;
        }
    
    private:
        task_type nextTask()
        {
            while( !m_sync_flag[m_rdIdx] )
                boost::thread::yield();  // thread in ready state
            const task_type task = m_tasks[m_rdIdx];
            m_sync_flag[m_rdIdx] = false; // Signal: Wert ist gelesen
            if( ++m_rdIdx == N )
                m_rdIdx = 0;
            return task;
        }
    
        void stop()
        {
            m_running = false;
        }
    
        void run()
        {
            cout << "Thread gestartet .. " << endl;
            while( m_running )
            {
                // -- cmd ausführen
                const task_type task = nextTask();
                task();
            }
        }
    
        // --   Member
        static const std::size_t N = 5;
        std::vector< bool > m_sync_flag;
        std::vector< task_type > m_tasks;
        std::size_t m_wrtIdx;
        std::size_t m_rdIdx;
        bool m_running;
    
        boost::thread m_thrd;
    };
    
    int fibo( int x ) // kleiner CPU-time-killer; nur zu Demo Zwecken
    {
        if( x < 2 )
            return x;
        return fibo( x - 1 ) + fibo( x - 2 );
    }
    
    void machwas( int x )
    {
        const int erg = fibo( x );
        cout << "Fibo(" << x << ") = " << erg << endl;
    }
    
    int main()
    {
        using namespace std;
        ActiveObject server;
        for( int n = 4; n < 40; n+=3 )
            server.Cmd( boost::bind( &machwas, n ) );
    
        cout << "Cmds abgegeben ... " << endl;
        return 0;
    }
    

    Was ich selbst noch hinzugefügt habe, sind die yield-Aufrufe, falls das Flag noch nicht freigegeben ist. Tut man das nicht, also lässt man die yield's weg, so geht die Performance deutlich in den Keller - zumindest bei meinen Single-Core-PCs. Das ist genau der zweite Effekt, denn ich in meinem letzten Posting beschrieben habe.

    Plassy schrieb:

    Noch was zu deiner Klasse...

    // -- Members
    boost::mutex m_guard;
    boost::condition m_signal;
    std::queue< Cmd > m_input;
    boost::thread m_thrd;       // MUSS hier letztes Member sein!
    

    Wiso muss "m_thrd" der letzte member sein??

    Nun die Member einer Klasse werden im Konstruktor angelegt, und zwar in der Reihenfolge in der sie als Member definiert sind; unabhängig davon in welcher Reihenfolge sie in der Initialisierungsliste auftauchen. Wird das Thread-Objekt 'm_thrd' angelegt, so wird der Thread unmittelbar gestartet und innerhalb dieser Thread-Funktion (ActiveObject::run) wird sofort auf die Member von ActiveObject zugegriffen. Sollten diese zu diesem Zeitpunkt noch nicht initialisiert sein - und genau das wäre der Fall, wenn sie hinter dem Thread in der Memberliste stenden - dann käme es zu undefiniertem Verhalten.
    Aus dem gleichen Grund sollte man auch keine Klasse mit einem boost::thread als Member als Basisklasse vorsehen. Der Thread würde starten, bevor das erste Member der abgeleiteten Klasse initialisiert wird.

    Plassy schrieb:

    ich hab das jetzt mal alles umgeschrieben.
    Ich hoffe Das ist so besser...??

    Ein kleine Ungenauigkeit; das notify ist in der Methode getNumTasks() ist nicht notwendig. Sonst sieht das gut aus.

    Ansonsten stimme ich den anderen zu. Ich würde auch immer erst mit Mutex und Lock anfangen, bevor ich mir Gedanken über lock-freien Datenzugriff mache. Die Programmierung mit Multithreading bleibt dann noch schwierig genug. Die Kunst besteht darin, das Design des Programms so zu gestalten, dass nicht allzu viele Locks notwendig werden.

    Gruß
    Werner



  • ProgChild schrieb:

    Jetzt gibt es aber Prozessoren, die können nur Double-Words addressieren. d.h. Ein bool würde auf 4Byte erweitert. Jetzt müssten vier Byte geschrieben werden...

    Die vier Byte können dann natürlich in einem Rutsch übertragen werden. Warum sollte man sonst nur jedes DWord adressieren können?



  • @Werner Salomon:
    Erstmal danke für den interessanten Dr. Dobbs Link!

    m_tasks[m_wrtIdx] = task; 
    m_sync_flag[m_wrtIdx] = true; // Signal: Wert ist geschrieben
    

    Ich fürchte das ist wieder strenggenommen falsch.
    Grund: du hast den Algorithmus von dem Dr. Dobbs Artikel modifiziert, indem du zusätzlich zur "light pipe" auch noch ein Array aus boost::function Objekten "überträgst". (Die "light pipe" basiert darauf dass das *alle* Daten die mit ihrer Hilfe übertragen werden auch *direkt* in der "light pipe" selbst übertragen werden)
    Weiters fehlt volatile 🙂

    Die oben zitierte Stelle würde funktionieren wenn das Schreiben des bools mit "release semantics" erfolgen würde. Das würde bedeuten dass sämtliche Modifikationen am Speicher überall hin sichtbar sein müssen (=für alle Threads) bevor das bool geschrieben wird, also bevor ein anderer Thread es als "true" lesen kann.

    Weiters müssten ähnliche Änderungen an anderen Stellen erfolgen - z.B. müsste das Lesen des bools im "Task-Ausführen-Thread" mit "acquire semantics" erfolgen, das "false" Setzen des bools ebenso wieder mit "release semantics" etc.

    Mit etwas Glück, dem passenden Compiler und einer Intel x86/x64 Plattform (die sehr sehr viele solche Fehler "verzeiht" - weit mehr als z.B. ein DEC Alpha oder auch ein PowerPC) geht es "trotzdem", aber verlassen kann man sich darauf nicht.

    ----

    Wollte man die im Dr. Dobbs Artikel beschriebenen Verfahren 1:1 auf diese Klasse anwenden müsste man die "auszuführenden" boost::function schon "serialisieren", und dann die serialisierte Version über die "light pipe" übertragen - das würde dann garantieren dass es auch ohne acquire/release semantic funktioniert. (volatile wäre IMO trotzdem angebracht, da der Compiler sonst die Zugriffe u.U. ganz "wegoptimieren" könnte -- vor allem die Lese-Zugriffe.)

    Ein boost::function zu serialisieren ist IMO nicht möglich (genauso wie die Implementierung von operator == für boost::function nicht möglich ist), aber man könnte statt boost::function z.B. einen "bound member function pointer" verwenden. Natürlich hat man dadurch viel weniger Freiheit in der Anwendung (boost::bind u.Ä. fällt flach), aber naja.
    So einen "bound member function pointer" könnte man dann relativ einfach als reihe von integralen typen "serialisieren".

    EDIT: mir fällt gerade ein, einen "bound member function pointer" kann man nicht einfach so serialisieren & übertragen, denn man anderen Ende müsste der selbe statische Typ verwendet werden wie der des Originals, und member function pointer zu casten ist "asking for trouble". Es müsste aber gehen wenn man sich über ein entsprechendes Template eine "invoker function" basteln lässt und der dann einen void* mitgibt.

    Allerdings habe ich NOCH ein anderes Problem nicht bedacht: das Objekt auf welches die member function aufgerufen werden soll muss ja AUCH mit "release semantics" in den Speicher "gezwungen" werden, und auf der anderen Seite mit "acquire semantics" wieder "rausgeholt" -- sonst könnte es sein dass der aktuelle State des Objektes auf der anderen Seite (wo die funktion ausgeführt werden soll) nicht "sichtbar" ist. Anders gesagt: man kommt wenn man "tasks" übertragen will die nicht komplett serialisiert sind sondern auch noch mit anderen Daten arbeiten sowieso nicht um acquire/release barriers herum.

    Und wieder einmal mehr zeigt sich: das ist alles garnicht SO einfach 😃
    /EDIT

    ----

    Nochwas zu dem Dr. Dobbs Artikel: ich habe ihn nicht Wort für Wort gelesen - kann also sein dass es im Artikel schon erwähnt wird - aber ein Punkt ist wichtig: diese "light pipes" funktionieren nur wenn der zugrunde liegende Typ atomar gelesen/geschrieben werden kann. *Normalerweise* sollte das für int möglich sein, aber für char oder bool ist es oft nicht garantiert (hier wieder: mit Intel CPUs x86/x64 geht fast alles, mit anderen CPUs u.U. nicht).
    Auf Plattformen wo die CPU nur 4-Byte Worte adressieren dann wird das Schreiben eines Bytes zu einem Read-Modify-Write, welches nichtmehr Atomar ist.
    Das "null setzen" eines Elementes der Pipe auf der Lese Seite könnte dadurch gleichzeitig geschriebene Daten "vernichten"...:

    // Thread A will index 0 der light pipe auf 0 setzen
    // Thread B will den Wert "123" an index 1 "nachschieben"
    A: liest 42,0,0,0 in reg (read Teil des read-modify-write)
    B: liest 42,0,0,0 in reg (read Teil des read-modify-write)
    A: modifiziert das erste Byte = 0 (-> reg=0,0,0,0 - modify Teil des read-modify-write)
    B: modifiziert das zweite Byte = 123 (-> reg = 42,123,0,0 - modify Teil des read-modify-write)
    B: schreibt reg (42,123,0,0) zurück (write Teil des read-modify-write)
    A: schreibt reg (0,0,0,0) zurück (write Teil des read-modify-write)
    // Endergebnis im Speicher: (0,0,0,0)
    // Erwartet: (0,123,0,0)
    

    Je nachdem wie A und B interleafed werden ergeben sich dadurch andere (oft falsche, manchmal richtige) Ergebnisse im Speicher ... was nicht gut sein kann.

    ----

    So. Was lernen wir daraus? Multi-Threading ist schwer, und man kann leicht Fehler machen, wenn man die sicheren (aber zugegebenermassen langweiligen) Pfade von Mutexen und Condition-Variablen verlässt und anfängt selbst gewisse Dinge zu stricken.



  • Bashar schrieb:

    Die vier Byte können dann natürlich in einem Rutsch übertragen werden. Warum sollte man sonst nur jedes DWord adressieren können?

    Weil er es nur kann. Der verwendete Speicher muss aber vielleicht Byteweise gesetzt werden.

    Du kannst dich nicht darauf verlassen, dass das auf jeder Prozessor-Architektur so ist, wie du sagst. Denk beispielsweise mal an eine Turing-Maschiene.



  • OK du gewinnst, die Turingmaschine hatte ich ja ganz vergessen.



  • Bashar schrieb:

    OK du gewinnst, die Turingmaschine hatte ich ja ganz vergessen.

    😃 Du hast ja recht, wenn du sagst, dass das in der Praxis kaum auftritt ^^


Anmelden zum Antworten