Lock-freier Algorithmus



  • Hi!

    Ich habe einen simplen lockfreien Algorithmus für ein Problem geschrieben (erst mal alles in sequentially consistend ordering). Das Problem lautet:
    Ich habe asynchrone Operationen (je Operation ein Handle). Das initiieren dieser Operationen ist nicht threadsicher:

    mutex mtx;
    mtx.lock();
    begin_operation(handle); // not thread safe
    mtx.unlock();
    

    Ich möchte mir nun die Mutexes sparen und trotzdem sicherstellen, dass zu jeder Zeit höchstens 1 Thread sich innerhalb von begin_operation befindet.

    Mein Ansatz: Eine einfach verkettete Liste (am Ende wird eingefügt). Es gibt quasi ein Token (wie in Token-Ring), welches nur ein Thread besitzt. Wenn nun ein anderer Thread kommt und eine Operation starten will und nicht den Token besitzt, dann hängt er sein Handle in die Liste, sodass der Token-Thread in der Liste das neue Element sieht und ebenfalls die Operation für dieses Handle startet. Er geht die Liste so lange durch, bis er es schafft, das Token abzugeben.
    Der Code:

    #include <atomic>
    #include <thread>
    #include <cassert>
    
    struct list {
    	struct node {
    		std::atomic<node*> next;
    
    		void start_operation() {
    			static std::atomic_int i;
    			++i;
    			assert(i == 1);
    			--i;
    		}
    	};
    
    	std::atomic<node*> tail{ nullptr };
    
    	void push(node* new_node) {
    		node* old_tail = tail.exchange(new_node);
    
    		if(old_tail == nullptr || old_tail->next.exchange(new_node) == old_tail) {
    			node* current = new_node;
    			while(true) {
    				current->start_operation();
    				node* expected = nullptr;
    				if(current->next.compare_exchange_strong(expected, new_node)) return;
    				else current = expected;
    			}
    		}
    	}
    };
    

    Ein Thread erhält das Token in Zeile 22 wenn die Liste entweder leer ist oder der Next-Pointer des letzten Elements in der Liste nicht auf sich selber zeigt (während er den Next-Zeiger mit der Adresse des neuen Elements überschreibt). Wenn er den Token hat, muss er die Operation starten und seinen Next-Zeiger auf sich selbst umbiegen, um das Token abzugeben (in Zeile 27). Wenn das gelingt, ist er fein raus, ansonsten muss er die nächste Operation in der Liste starten, so lange, bis er es schafft, vor einem anderen Thread den Next-Zeiger auf sich selbst zeigen zu lassen.

    Hat dieser Code einen offensichtlichen Fehler?



  • Jodocus schrieb:

    Hat dieser Code einen offensichtlichen Fehler?

    Der offensichtliche Fehler ist der, dass der Code nicht verständlich ist, und das ist bei lock-free nie gut. Wie stellst du sicher, dass new_node gültig bleibt und wer kümmert sich um das Löschen? Wieso verlierst du in Zeile 22 keine Elemente?

    Offenbar hast du das falsch gemacht: https://ideone.com/BPBh60



  • Hi!
    Danke für deine Antwort.

    join freddi schrieb:

    Jodocus schrieb:

    Hat dieser Code einen offensichtlichen Fehler?

    Der offensichtliche Fehler ist der, dass der Code nicht verständlich ist, und das ist bei lock-free nie gut.

    Hm, was genau ist unverständlich? In Zeile 20 wird das Ende der Liste durch das neue Element ersetzt. Das ehemalige, letzte Element hat nun einen Next-Pointer, der entweder auf sich selber zeigt oder nullptr ist. In Zeile 22 wird dieser Next-Zeiger dann auf den neuen Tail gesetzt. Wenn er vorher auf den Tail-Node zeigte, heißt das, dass der laufende Thread autorisiert ist, Operationen zu starten (ab Zeile 23). Jetzt startet er erst mal die eigene Operation (Zeile 25) und versucht, den Next-Zeiger danach auf den gleichen Node (new_node) zeigen zu lassen (Zeile 27). Wenn das nicht gelingt, muss ein anderer Thread durch Zeile 22 gegangen sein. D.h., jetzt muss auch die Operation gestartet werden, die der andere Thread hinterlassen hat. Sobald das getan ist, wird wieder versucht, den Next-Pointer des letzten Nodes zu manipulieren. So lange, bis es klappt und mal keine Aufträge in der Liste standen.
    Ist das verständlicher?

    join freddi schrieb:

    Wieso verlierst du in Zeile 22 keine Elemente?

    Wie können hier nodes verloren gehen? Mir fällt gerade kein Szenario ein, weshalb das passieren könnte. Jeder Thread, der pushen will, erhält durch exchange() die Ownership vom vorigen Element und biegt dann nachträglich seinen Next-Zeiger um. Dass der Tail dauernd überschrieben werden kann, macht ja nichts. 😕

    join freddi schrieb:

    Wie stellst du sicher, dass new_node gültig bleibt und wer kümmert sich um das Löschen?

    Löschen ist für mich erst einmal kein Thema. Darüber mache ich mir Gedanken, wenn ich halbwegs weiß, dass erst einmal das hier klappt. new_node bleibt natürlich immer gültig, das soll hier angenommen werden.

    join freddi schrieb:

    Offenbar hast du das falsch gemacht: https://ideone.com/BPBh60

    Blöderweise kann ich das Ergebnis auf meiner Maschine nicht reproduzieren. 😞 Ich weiß nicht wirklich, was schiefgegangen ist.



  • Hoppla, sorry, ich habe einen dummen, ersten Fehler gefunden.
    In Zeile 27 muss es natürlich

    compare_exchange_strong(expected, current)
    

    heißen, ansonsten wird ja nicht auf den selben Node zurückgezeigt, sondern auf den ursprünglichen, mit dem der Push begann. Er scheint jetzt auf ideone auch zu klappen.



  • Okay, scheinbar können hier nicht viele was mit meinem Post anfangen, also noch mal:
    Angenommen, ich habe eine Menge von Funktionen, die nicht threadsicher sind. Wenn mehrere Threads diese Funktionen nun parallel aufrufen, muss ich sie synchronisieren. Dafür kann man z.B. eine Mutex benutzen. Ich finde Mutexes aber blöd, weil sie Threads blockieren, die etwas nützliches machen könnten (bzw. der Scheduler einem anderen Prozess nun die Rechenzeit gibt).
    Deshalb versuche ich, einen Code zu schreiben, bei dem ein Thread nicht schlafen gelegt wird, sondern er eine Nachricht hinterlässt, dass der Thread, der quasi "acquired" hat, seinen Job auch noch machen soll. Auf diese Weise soll garantiert werden, dass die Funktion immernoch nur von höchstens 1 Thread gleichzeitig aufgerufen wird und gleichzeitig alle anderen Threads etwas anderes machen können.

    Hier ein Beispiel (jetzt auch mit Memory-Management):

    #include <algorithm>
    #include <atomic>
    #include <chrono>
    #include <functional>
    #include <iostream>
    #include <iterator>
    #include <string>
    #include <thread>
    #include <vector>
    
    template <typename TaskType>
    struct task_list {
    
    	struct node {
    		node(std::function<TaskType> task)
    			: task(task), next(nullptr) { }
    
    		std::function<void()> task;
    		std::atomic<node*> next;
    	};
    
    	std::atomic<node*> tail{ nullptr };
    
    	void push_task(std::function<TaskType> task) {
    		node* new_node = new node(task);
    		node* old_tail = tail.exchange(new_node, std::memory_order_relaxed);
    
    		if(old_tail == nullptr || old_tail->next.exchange(new_node, std::memory_order_consume) == old_tail) {
    			delete old_tail;
    
    			node* current = new_node;
    			for(;;) {
    				current->task();
    				node* expected = current->next.exchange(current, std::memory_order_release);
    				if(expected == nullptr) return;
    				else {
    					delete current;
    					current = expected;
    				}
    			}
    		}
    	}
    
    	~task_list() {
    		delete tail.load();
    	}
    };
    
    int main() {
    	task_list<void()> list;
    
    	std::vector<std::thread> threads;
    	std::string hello = "Hello, World!\n";
    
    	auto task = [&list, &hello] {
    		std::transform(std::begin(hello), std::end(hello), std::ostream_iterator<char>(std::cout), [](auto c) {
    			std::this_thread::yield();
    			std::this_thread::yield();
    			return c;
    		});
    	};
    
    	std::cout << "Ugly:\n";
    	for(int i = 0; i < 10; ++i) {
    		threads.emplace_back([&list, task] {
    			task();
    		});
    	}
    
    	for(auto& thread : threads)
    		if(thread.joinable()) thread.join();
    	threads.clear();
    
    	std::cout << "\nNice:\n";
    	for(int i = 0; i < 10; ++i) {
    		threads.emplace_back([&list, task] {
    			list.push_task(task);
    		});
    	}
    
    	for(auto& thread : threads)
    		if(thread.joinable()) thread.join();
    }
    

    Indem der Task in die Liste gepusht wird, wird er synchronisiert. Der Algorithmus ist der gleiche. Nur wie kann ich wissen, ob er stimmt?



  • Ich verstehe die Motivation ehrlich gesagt nicht ganz.
    Normalerweise braucht man nachdem man eine Funktion aufgerufen hat das Ergebnis des Aufrufs (was u.U. auch nur der Umstand sein kann dass die Nebeneffekte des Funktionsaufrufs jetzt sichtbar sind).
    Bei deiner Variante geht das nicht.

    Und ... was spricht jetzt wirklich gegen eine Mutex? "Mag ich nicht" ist doch wohl kein guter Grund.

    Aber ganz abgesehen davon: versuch erstmal ein Beispiel zu basteln wo man diese Funktionalität sinnvoll anwenden könnte.

    ps: Die Implementierung der push_task Funktion ist für mich total wirr.



  • Hi! Danke, dass du dir die Zeit nimmst.

    hustbaer schrieb:

    Ich verstehe die Motivation ehrlich gesagt nicht ganz.
    Normalerweise braucht man nachdem man eine Funktion aufgerufen hat das Ergebnis des Aufrufs (was u.U. auch nur der Umstand sein kann dass die Nebeneffekte des Funktionsaufrufs jetzt sichtbar sind).
    Bei deiner Variante geht das nicht.

    Allerdings. Wenn der Algorithmus vom Ergebnis des Aufrufs (oder seiner Nebeneffekte) abhängen würde, müsste man wieder eine Mutex benutzen, da der Code dann eh nichts sinnvolles machen kann, solange er die Funktion nicht aufrufen darf.

    hustbaer schrieb:

    Und ... was spricht jetzt wirklich gegen eine Mutex? "Mag ich nicht" ist doch wohl kein guter Grund.

    Angenommen, der Code nach dem Funktionsaufruf würde nicht vom Ergebnis abhängen. In dem Moment wäre es schade, sich in den Mutex-Lock zu begeben, obwohl man eigentlich nur will, dass die Funktion überhaupt mal synchronisiert aufgerufen wird und man etwas anderes sinnvolles tun könnte.
    Der primäre Anwendungsfall, wofür ich das machen will, ist für WinSock (mit seiner verbauten IOCP-API). Die zu synchronisierenden Funktionen sind z.B. WSASend, WSARecv (also Funktionen, die asynchrone Operationen initiieren). Laut Reference sind diese tatsächlich nicht threadsafe. Jetzt kann ich bei feinster Granulierung höchstens jedem Socket einen Mutex verpassen (wobei auf meinem System ein Mutex alleine schon 10 mal größer als ein Socket ist) und die Aufrufe pro Socket synchronisieren.
    Ich will es aus purer Langeweile aber mal probieren, das ganze lockfrei hinzubekommen.

    hustbaer schrieb:

    ps: Die Implementierung der push_task Funktion ist für mich total wirr.

    Hm, irgendwie schaffe ich es nicht, die Idee zu kommunizieren.
    Dann mal grafisch:

    |
      |
      V
    +---+
    |   |<-+
    | A |  |
    |   |--+
    +---+
    

    Das bedeuetet: Ein Node (namens A) mit einem Next-Pointer, der auf sich selber zeigt, wobei auch der Tail-Pointer (von oben) auf den Node zeigt
    Ausgangssituation:

    |
      |
      V
    +---+
    |   |<-+
    | B |  |
    |   |--+
    +---+
    

    Jetzt mache ich (abgekürzte) Diagramme für alle atomaren Schritte:
    1.

    |
                      V
           +---+    +---+
           |   |<-+ |   |
    ... -->| B |  | | C |
           |   |--+ |   |
           +---+    +---+
    

    Das ist Zeile 26, C wird der neue Tail. Dabei merkt er sich den alten Tail innerhalb von old_tail . Hier besteht eine Race Condition zwischen den Threads, aber das ist nicht schlimm, da sich jeder Thread den alten Tail merkt und nichts verloren geht.
    2.

    |
                       V
            +---+    +---+
            |   |    |   |
    ... --->| B |--->| C |
            |   |    |   |
            +---+    +---+
    

    Das ist 28. Der Next-Pointer des Vorgängers wird umgebogen. Durch das exchange in Schritt 1 ist gewährleistet, dass nur 1 Thread das tun kann, also keine Race Condition. Da der Next-Pointer von B vorher auf sich selbst zeigte (oder der Tail selbst nullptr war), ist C nun der autorisierte "Funktionsaufrufer" (er hat quasi den Mutex akquiriert). Wiederrum wegen der obigen Begründung kann nur 1 Thread gleichzeitig autorisiert sein.
    3.
    Da der Thread jetzt autorisiert ist, kann er sicher den Task ausführen (Zeile 33). Andere Threads könnten in der Zwischenzeit auch Elemente an die Liste gepusht haben. Keiner von ihnen kann autorisiert sein, die Tasks zu starten, weshalb sie alle nicht in den äußeren if-Zweig von push_task gelangen. Wir merken anhand des Next-Pointers von C, der nicht länger auf 0 zeigen könnte, das neue Operationen anstehen.

    |
                                V
            +---+    +---+    +---+
            |   |    |   |    |   |
    ... --->| B |--->| C |--->| D |
            |   |    |   |    |   |
            +---+    +---+    +---+
    

    Wir möchten "den Mutex" nun wieder freigeben, indem wir C auf sich selber zeigen lassen (Zeile 34).

    |
                                V
            +---+    +---+    +---+
            |   |    |   |<-+ |   |
    ... --->| B |--->| C |  | | D |
            |   |    |   |--+ |   |
            +---+    +---+    +---+
    

    5. Sofern der Next-Zeiger von C vorher auf 0 stand, ist kein neuer Node dazugekommen (d.h. alle anderen potentiell pushenden Threads befinden sich vor Zeile 28, welche mit Zeile 34 in synchronizes-with-Beziehung steht). Dann kann die Funktion verlassen werden. Ansonsten muss (wie im Diagramm gezeigt) auch der Task D ausgeführt werden (Schleifenablauf wiederholt sich). Das wird nun so lange gemacht, bis der Thread einen Next-Zeiger auf den selben Node zeigen lässt, der vorher auf 0 und nicht auf einen anderen Node zeigte. Da nur 1 Thread autorisiert ist, wird auch nur 1 Thread durch diese Schleife gehen und versuchen, den Next-Zeiger zu manipulieren. Alle anderen Threads belassen ihn bei nullptr .
    Bei dieser Gelegenheit löscht er die ganzen alten anderen Nodes vor ihm, auf die nun niemand außer er selbst zugreifen könnte (Zeile 37).

    |
                                V
            +---+    +---+    +---+
            |   |    |   |<-+ |   |<-+
    ... --->| B |--->| C |  | | D |  |
            |   |    |   |--+ |   |--+
            +---+    +---+    +---+
            (del)    (del)
    

    Ich hoffe, jetzt ist die Idee etwas klarer. Gibt's einen Haken und wenn ja, wo?


Anmelden zum Antworten