threadsafe deque



  • Sofern alle nur lesen dann ist es okay. Leider wird wahrscheinlich ein Teilnehmer schreiben, weil es sonst witzlos waere.

    Hier eine Implementation aus C++11 Concurrency Teil 9

    template<typename T>
    class msg_queue
    {
    public:
        // preventing implicit generation of move constructor
        ~msg_queue() {}
    
    	void post(T&& msg)
    	{
    		std::lock_guard<std::mutex> lock(mtx);
    		queue.push_front(std::move(msg));
    		cond.notify_one();
    	}
    
        bool wait_for(T& msg, std::chrono::milliseconds time)
    	{
    		std::unique_lock<std::mutex> lock(mtx);
    		bool b = cond.wait_for(lock, time, [this]{return !queue.empty(); });
    		if (b)
    		{
    			msg = std::move(queue.back());
    			queue.pop_back();
    			return true;
    		}
    		else
    		{
    			return false;
    		}
    	}
    
    	unsigned int size()
    	{
    		std::lock_guard<std::mutex> lock(mtx);
    		return queue.size();
    	}
    
        void clear()
        {
            std::lock_guard<std::mutex> lock(mtx);
            queue.clear();
        }
    
    private:
    	std::deque<T> queue;
    	std::condition_variable cond;
    	std::mutex mtx;
    };
    


  • SeppJ schrieb:

    Wie meinst du das gelöst zu haben?

    Vielleicht so?

    TheBigW schrieb:

    Ringpuffer

    Eine normale queue ist das jedenfalls nicht.



  • http://www.drdobbs.com/parallel/writing-lock-free-code-a-corrected-queue/210604448

    Lockless Code ist sehr sehr sehr sehr sehr sehr sehr schwer richtig zu schreiben.

    Lieber einfach locken (wie knivil). Das ist viel viel viel viel viel einfacher korrekt hinzubekommen.



  • Wieso nimmst du dann nicht deque?

    aber bei einer "normalen" deque, macht eine Einfügeaktion alle Iteratoren ungültig

    genau das ist das Problem... Mit einem Block Speicher, der nur gelesen/geschrieben wird spart man sich halt einiges.

    kurzes sample. IMO gibt es nur probleme, wenn begin==end (buffer underrun). unter der Massgabe das ich in Kauf nehme dass m_Data in diesem Fall ab und an Bloedsinn bekommen kann solte das doch eigentlich ok sein:

    void push_back( const value_type& data_r )
    {
    		m_pDataEnd->m_Data = data_r;
    		m_pDataEnd = m_pDataEnd->m_pNext;
    }
    
    value_type pop_front()const
    {
    		value_type data( m_pDataBegin->m_Data );
    		m_pDataBegin = m_pDataBegin->m_pNext;
    		return data;
    }
    

    Ok, weiss nicht ob ich mit lock billiger davon komme. Ich bekomme eine fixe Anzahl Samples (ca. 10) alle ca. 100 ms. Leider zeitlich nicht immer continuierlich. Diese sollen aber mit 1/10ms visualisiert werden. Die Kurve soll fluessig aussehen. Die Deque ist sozusagen der Puffer zwischen dem DatenThread und dem TimerThread der den UI Refresh triggered. Ich habs mit Threadlock versucht, aber dann habe ich durch den sync overhead keine fluessige Visualisierung mehr hinbekommen. Und eigentlich sollte es passen sobald beide Threads an verschiedenen Enden des Speicherblock arbeiten...
    Solange es keine Probleme verursacht die zum crash fuehren kann ich sogar damit leben...



  • TheBigW schrieb:

    Ok, weiss nicht ob ich mit lock billiger davon komme. Ich bekomme eine fixe Anzahl Samples (ca. 10) alle ca. 100 ms. Leider zeitlich nicht immer continuierlich. Diese sollen aber mit 1/10ms visualisiert werden. Die Kurve soll fluessig aussehen. Die Deque ist sozusagen der Puffer zwischen dem DatenThread und dem TimerThread der den UI Refresh triggered. Ich habs mit Threadlock versucht, aber dann habe ich durch den sync overhead keine fluessige Visualisierung mehr hinbekommen. Und eigentlich sollte es passen sobald beide Threads an verschiedenen Enden des Speicherblock arbeiten...
    Solange es keine Probleme verursacht die zum crash fuehren kann ich sogar damit leben...

    Ganz blöde Idee:

    Das Problem mit Locks ist, dass man sie nicht dauernd setzen will. Wenn dein Write-Thread nun alle 100ms etwas schreibt und der Read-Thread alle 10ms etwas liest, dann lockst du pro 100ms ja 11mal.

    Wenn du nun mit einer Verzögerung leben kannst - sagen wir die Daten die du visualisiert sind nicht echtzeit sondern eine halbe Sekunde hinten: dann buffert der Write-Thread in einer eigenen Queue einfach mal 50 Einträge und schreibt diese in deine synchronised Queue erst wenn er 50 Einträge voll hat.

    Gleiches macht der Read-Thread, er speichert sich 50 Einträge vor und holt die nächsten 50 erst ab, wenn er sie aufgebraucht hat.

    Dadurch hast du statt 11 Locks pro 100ms nur noch 2 Locks pro 500ms.

    Das ganze braucht natürlich feintuning. Aber in den seltensten Fällen brauchst du wirklich echtzeit updates. Es reicht ja schon wenn du damit leben kannst 100ms "hinten" zu sein. Denn dann hast du auch wieder nur 2 Locks pro 100ms, und alles läuft flüssiger.

    Statt auf eine fixe Anzahl Einträge zu gehen, kann man natürlich auch in ms rechnen. zB alle 100ms wird gepusht und alle 50ms gepollt.



  • Mit einem Block Speicher, der nur gelesen/geschrieben wird spart man sich halt einiges

    Ich benutze obige Messagequeue in erster Linie um Pointer auf Speicherbereiche, die gerade aktuell sind, hin und herzuschieben. Ich habe also "rohen" Speicher und brauche dennoch nicht kopieren. Speicherverwaltung und Kommunikation sind getrennt. Loesungen gibt es viele, aber manchmal Bloedsinn in Kauf nehmen ist fuer mich nicht drin.



  • aber manchmal Bloedsinn in Kauf nehmen ist fuer mich nicht drin

    Vollkommen richtig. Der Bloedsinn ist ja auch eher das Errorszenario was da heisst: Buffer underrun. Selbst den fange ich ab indem ich im lesethread auf empty (begin==end) checke. Ok, unter der Annahme das der Vergleich zweier Pointer eine atomare Operation ist sollte das gehen. Ich poste auch gerne mal zur allgemeinen Erheiterung den ganzen code :).

    Die eigentliche Frage ist also eher: ist es sicher, wenn zwei threads an zwei verschiedenen Stellen eines zusammenhaengenden Speicherberichs schreiben/lesen?

    @Shade: genau an den Ansatz habe ich auch gedacht. Ein bisschen hinterherlaufen ist denke ich OK. Ich werd mal druebr schlafen: billig ist diese Implementierung nicht.
    Im Prinzip verhaelt sich meine deque aehnlich: Ich fuelle immer erstmal 20 Samples ein bevor ich die Visualisierung loslaufen lasse. Im Fall eines underrun wartet die Visualisierung und fuellt erstmal wieder 20 samples auf.



  • Ich kann mir nicht vorstellen, dass es einen atomaren Vergleich zweier Größen gibt, weil das zwei Read-Operationen beinhaltet.



  • Decimad: Kompär'n'Schwap

    :xmas2: :xmas2: :xmas2:



  • Lord Rudolphi schrieb:

    Decimad: Kompär'n'Schwap

    :xmas2: :xmas2: :xmas2:

    Hey, Dir auch frohe Weihnachten 😉
    der von Dir verlinkte Artikel sagt ja aus, dass mit einem vorgegebenen Wert vergleicht. Also wenn ich mir den Pseudo-Code da anschaue, dann muss der Wert "alt" schon "geholt" worden sein, bevor die Instruktion angelaufen wird (oder aber einfach eine Konstante sein). Gehört dieses holen (Read-Operation) von "alt" genauso zu der atomaren compare&swap-Operation wie das holen von "*speicheradresse"? Ich hätte jetzt nicht gedacht und geglaubt dass dieses compare&swap nur mit in der Instruktion kodierten Konstanten von "alt" funktioniert.

    Viele Grüße,
    deci



  • ok, ich denke Decimad bezieht sich auf

    unter der Annahme das der Vergleich zweier Pointer eine atomare Operation ist

    ich habe dort eine empty methode mit der der Lesethread checkt ob auch was zu lesen da ist: die macht nur

    bool empty()const
    {
       return m_pDataBegin ==  m_pDataEnd;
    }
    

    das sollte verhindern das der Lesethread unvollstaendige Daten liest, die der Schreibthread noch nicht fertig geschrieben hat. Solang der Vergleich selber atomar ist sollte die methode selber per inline wegoptimiert werden.

    Sorry fuer die Verwirrung, bezogen auf meinen bisher geposteten code schon klar: push und pop sind in keinem Fall threadsafe/atomar. Deshalb auch diese primitive Absicherung im underrun fall mit der empty - Kruecke.



  • TheBigW schrieb:

    Selbst den fange ich ab indem ich im lesethread auf empty (begin==end) checke.

    Auch wenn dieser Vergleich atomar sein sollte, kann es passieren, dass der aufrufende Thread nach dem Test unterbrochen wird, das letzte Element aus dem Buffer geholt wird, und der erste Thread wieder zum Zuge kommt. Er denkt sich, toll ich kann poppen, weil habe ich ja grade getestet. Leider ist der Buffer trotzdem leer.

    PS: Bitte, entweder mache es richtig oder nicht, aber hoere auf zu argumentieren, dass doch alles nicht so schlimm ist und irgendwie funktioniert.



  • PS: Bitte, entweder mache es richtig oder nicht, aber hoere auf zu argumentieren, dass doch alles nicht so schlimm ist und irgendwie funktioniert.

    Das war nie meine Absicht. Es geht fuer mich darum wie ich mit minimalem lock overhead arbeiten kann. Deine Implementierung ist super. Fuer das Wegschreiben der Daten wo es wirklich auf Korrektheit ankommt mache ich es genau so. Das funktioniert nur leider nicht so einfach fuer fluessige kontinuierliche Visualisierung bei schwankender Dateneingangsrate. Das Problem ist hauptsaechlich das allgemeine Ansaetze fuer threadsafe container bei jedem containerzugriff locken. Das ist ja auch am sichersten und sollte auch so gemacht werden.
    Shades Loesung ist auch denkbar: recht hoher Aufwand und recht speziefisch auf die Datenrate : wie man an seinem Beispiel sieht bin ich da recht abhaengig von der Datenrate um entsprechende parameter zu waehlen. Hinzu kommt der Nachteil des "Nachhaengens".

    Damit es nicht zu langweilig wird eine neue Idee: der simultane Zufriff auf den Datencontainer an beliebigen Enden der Queue ist ja kein Problem. Demzufolge waere es auch unsinig dort jedesmal zu locken:

    void push_back( const value_type& data_r )
    {
            m_pDataEnd->m_Data = data_r;
            m_pDataEnd = m_pDataEnd->m_pNext;
    }
    
    value_type pop_front()const
    {
            value_type data( m_pDataBegin->m_Data );
            m_pDataBegin = m_pDataBegin->m_pNext;
            return data;
    }
    

    Im Prinzip muesste doch nur op= von value_type den simultanen Zugriff auf ein und die selbe Instanz verhindern. Ich werde mal experimentieren ob ich das irgendwie schick hinbekomme...



  • TheBigW schrieb:

    Das war nie meine Absicht. Es geht fuer mich darum wie ich mit minimalem lock overhead arbeiten kann.

    Schau dir nochmal meinen Link an:
    http://www.drdobbs.com/parallel/writing-lock-free-code-a-corrected-queue/210604448

    Da wird eine Lockfree Queue diskutiert, mit Referenzen zu anderen Artikeln. Wenn du eine Lockfree Queue haben willst, ist das dein Startpunkt.

    Ansonsten:
    liblfds
    Windows Interlocked Single List

    bzw.: http://www.c-plusplus.net/forum/281265-full



  • 1000 Locks pro Sekunde ist doch wirklich nicht die Welt. Das kann keinen relevanten Overhead erzeugen. Mach es, wie knivil beschrieben hat. Du kannst die Klasse genau so übernehmen. Dann funktioniert das.

    Wenn Dein Programm nicht wirklich flüssig läuft, dann muss es andere Ursachen haben.

    Wenn du mehr als 1 Mio Nachrichten pro Sekunden über die Queue schicken willst, dann kannst Du Dich mal schlau machen, wie lockless queues programmiert werden. Da gibt es Artikel dazu.



  • @Shade: Danke - haette ich mal gleich Deinen Kommentar und den drdobs richtig gelesen... Auf den ersten Blick dachte ich: ok, so aehnlich ist ja mein producer/consumer - aber eben nur so aehnlich. Das muss ich erstmal verdauen... Immer wenn man denkt man haette alles verstanden macht man die bloedesten Fehler.

    @ich bins: Das Problem ist die asynchrone Lieferung der Daten und die kontinuierliche UI Anzeige. In dem Moment wo 10 Samples auf einmal geschrieben werden haengt die Visualisierung sichtbar nach, weil sie ja per lock die Daten langsamer bekommt - der Schreibthread ist da halt einfach fixer und bekommt demzufolge das lock haeufiger. Kein Performanceproblem: inclusive Visualisierung nahezu 0% CPU load wie sich das gehoehrt :).



  • TheBigW schrieb:

    @Shade: Danke - haette ich mal gleich Deinen Kommentar und den drdobs richtig gelesen... Auf den ersten Blick dachte ich: ok, so aehnlich ist ja mein producer/consumer - aber eben nur so aehnlich. Das muss ich erstmal verdauen... Immer wenn man denkt man haette alles verstanden macht man die bloedesten Fehler.

    @ich bins: Das Problem ist die asynchrone Lieferung der Daten und die kontinuierliche UI Anzeige. In dem Moment wo 10 Samples auf einmal geschrieben werden haengt die Visualisierung sichtbar nach, weil sie ja per lock die Daten langsamer bekommt - der Schreibthread ist da halt einfach fixer und bekommt demzufolge das lock haeufiger. Kein Performanceproblem: inclusive Visualisierung nahezu 0% CPU load wie sich das gehoehrt :).

    Kannst du nicht mehrere samples gleichzeitig in die queue schieben/rausholen? Es würde ja reichen, sie 60 mal pro Sekunde reinzuschieben, öfter wird ja wahrscheinlich eh nicht visualisiert.


Anmelden zum Antworten