std::mutex mit Priorität?
-
Hallo,
gibt es in der Standard library (oder evtl. in boost) eine Möglichkeit einen Mutex mit einer bestimmten Priorität zu locken, also so dass ein thread mit hoher Priorität immer beim warten Vorrang hat?
Hab bei Google eine Lösung gefunden die 3 verschiedene Mutex verwendet, ist das eine gute Lösung (vor allem auch eine die 100% sicher ist)? Ich frag auch weil die Antwort da schon über 3 Jahre alt ist, gibt es da was "besseres" oder ist das der empfohlene Weg?
Am liebsten wäre mir etwas wie
my_mutex.lock(priority::HIGH)
oder so
-
Wie du selber rausgefunden hast, ist das nicht so einfach. Daher die obligatorische Frage: Was brauchst du? Insbesondere wie viele Prioritätslevel, ob stufenlos (double), einige wenige (<10) oder nur 3 oder 2.
-
Ich brauche eigentlich nur 2 Level: LOW und HIGH. Wobei es immer nur einen HIGH thread geben wird und wenige LOW (aktuell einen, später eventuell 2 bis 3). Von denen ist die Priorität dann aber egal, nur mein einer HIGH thread soll Vorrang haben.
-
Bei nur 2 Threads insgesamt reicht ein einfacher Mutex.
Ab 3 Low-priority-Threads würde ich die verlinkte 3-Thread-Lösung nehmen. Die funktioniert (spiel halt mal alle Fälle durch) und benötigt garantiert konstante Zeit.
-
*3-Mutex-Lösung
-
Eine allgemeine Lösung kann mittels einer
priority_queue
geschrieben werden. Stattvector
sollte manboost::container::static_vector
als den vonqueue
verwendeten Container festlegen, falls möglich.Ein grobes Beispiel:
class priority_mutex : std::mutex { std::priority_queue<std::size_t> queue; std::mutex qmutex; public: using std::mutex::native_handle_type; using std::mutex::native_handle; using std::mutex::unlock; using std::mutex::try_lock; void lock(std::size_t p) { if (!try_lock()) { qmutex.lock(); queue.push(p); qmutex.unlock(); while (queue.top() != p || !try_lock()) std::this_thread::sleep_for(std::chrono::milliseconds(5)); qmutex.lock(); queue.pop(); qmutex.unlock(); } } };
Demo gibt's hier.
-
Arcoth schrieb:
while (queue.top() != p || !try_lock()) std::this_thread::sleep_for(std::chrono::milliseconds(5));
Bitte kein Busy-Waiting. Davon abgesehen skaliert es nicht. Das ist ein gutes Beispiel, wie man es nicht machen sollte.
-
Hast du einen besseren Vorschlag?
-
logga schrieb:
Arcoth schrieb:
while (queue.top() != p || !try_lock()) std::this_thread::sleep_for(std::chrono::milliseconds(5));
Bitte kein Busy-Waiting. Davon abgesehen skaliert es nicht. Das ist ein gutes Beispiel, wie man es nicht machen sollte.
Jo, ist mir auch klar geworden. Ich schreibs um.
-
class two_priority_mutex { std::mutex low_priority_mutex, next_access_mutex, data_mutex; public: void lock() { std::unique_lock<std::mutex> low(low_priority_mutex); std::lock_guard<std::mutex> guard(next_access_mutex); data_mutex.lock(); low.release(); } void unlock() noexcept { data_mutex.unlock(); low_priority_mutex.unlock(); } void high_priority_lock() { std::lock_guard<std::mutex> guard(next_access_mutex); data_mutex.lock(); } void high_priority_unlock() noexcept { data_mutex.unlock(); } class high { two_priority_mutex *me; public: high(two_priority_mutex *me) : me(me) {} void lock() { me->high_priority_lock(); } void unlock() noexcept { me->high_priority_unlock(); } }; high high_priority_locker() { return high(this); } }; two_priority_mutex m; // High-Priority-Thread { auto h = m.high_priority_locker(); std::lock_guard<two_priority_mutex::high> g(h); ... } // Low-Priority-Threads { std::lock_guard<two_priority_mutex> g(m); ... }
-
Ich hab es mit einer cv umgeschrieben:
class priority_mutex : std::mutex { std::priority_queue<std::size_t> queue; std::mutex qmutex; std::condition_variable cv; public: using std::mutex::native_handle_type; using std::mutex::native_handle; using std::mutex::try_lock; void lock(std::size_t p) { if (!try_lock()) { std::unique_lock<std::mutex> ulock(qmutex); queue.push(p); cv.wait(ulock, [this, p] {return queue.top() == p && try_lock();}); queue.pop(); qmutex.unlock(); cv.notify_all(); } } void unlock() { std::mutex::unlock(); cv.notify_all(); } };
Demo. Kann aber natürlich wieder Unsinn sein
-
Wow, vielen Dank auf jeden Fall euch beiden, werde ich mir gleich mal genauer anschauen
-
Arcoth schrieb:
Ich hab es mit einer cv umgeschrieben:
Auf den ersten Blick finde ich das eine schicke Lösung
Allerdings behagt mir dieser Prioritäts-Mutex trotzdem irgendwie nicht so ganz:
Ich will nicht ausschließen, dass er irgendwo mal Sinn macht, aber mein Bauchgefühl sagt mir, dass die Prioritätswarteschlange überhaupt erst dann ihre
Wirkung entfaltet, wenn man eigentlich ein anderes Problem hat: Zu viel Contention auf der Ressource, die von dem Mutex geschützt wird.Das mag jetzt eine Binsenweisheit sein, und das Problem ist oft nicht einfach zu lösen.
Eine Technink als Anregung: bei keineren bis mittelgroßen Objekten mit viel Contention hat sich bei mir des öfteren z.B. ein COW-Ansatz bewährt.
Trotz der zusätzlichen Kopie hat sich dabei nicht selten der Gesamtdurchsatz deutlich erhöht, da sich die Threads wesentlich weniger auf gegenseitig auf den Füßen stehen:auto shared = std::make_shared<const Resource>(); thread1: auto res = std::atomic_load(&shared); res->readOnly(); thread2: auto res = std::atomic_load(&shared); auto mut = std::make_shared<Resource>(*res)); mut->mutate(); std::atomic_store(&shared, std::static_pointer_cast<const Resource>(mut));
Das ist natürlich nur eine mögliche Variante, wie man auf den Mutex verzichten kann, und sie ist natürlich nicht immer sinnvoll.
Gut möglich dass der Prioritäts-Mutex schon eine der besseren Lösungen für dein spezielles Problem ist.Finnegan
-
Arcoth schrieb:
Demo. Kann aber natürlich wieder Unsinn sein
Mein Hauptkritikpunkt ist immer noch da. Wenn du 1000 Threads am Warten hast, musst du jedesmal immer alle aufwecken. Bei n Locks hat jeder Thread O(n) Overhead, total macht das O(n2) Laufzeit.
Hier meine Umsetzung mit O(1) Overhead und O(n log n) Laufzeit (optimal, da die Priority-Queue so lange braucht):
template <typename T> class priority_mutex : std::mutex { struct entry { entry(T value, std::condition_variable *cv) : value(std::move(value)), cv(cv) {} T value; std::condition_variable *cv; friend bool operator<(entry const& a, entry const& b) { return a.value < b.value; } }; std::priority_queue<entry> queue; std::mutex queue_mutex; std::mutex work_mutex; public: void lock(T p) { std::unique_lock<std::mutex> guard(queue_mutex); std::condition_variable cv; queue.emplace(p, &cv); std::unique_lock<std::mutex> worker(work_mutex, std::defer_lock); cv.wait(guard, [&]{ return p == queue.top().value && worker.try_lock(); }); queue.pop(); worker.release(); } void unlock() noexcept { work_mutex.unlock(); std::lock_guard<std::mutex> guard(queue_mutex); if (!queue.empty()) queue.top().cv->notify_one(); } };
Der am höchsten prioritäre Thread wird gezielt aufgeweckt. Es kann zwar passieren, dass zwischen unlock() und dem Aufwecken der condition variable ein neues Top-Element eingefügt wird, aber das startet dann sofort.
-
@logga
Deine Lösung hat dafür einen relativ grossen fixen Overhead, nämlich das Erzeugen + Löschen der lokalencondition_variable
.
(Wie gross der Overhead wirklich ist hängt natürlich von der Implementierung der Standard-Library ab, aber ich gehe davon aus dass es Implementierungen gibt wo der Overhead einigermassen gross ist.)Ist im Prinzip das selbe Problem das man auch hat wenn man versucht
condition_variable
mit nur Win32 API zu implementieren (mit Windows XP, bzw. halt mit Windows Versionen die noch keine "native" Condition-Variablen haben). Da braucht man statt der per-Threadcondition_variable
einen per-Thread EVENT. Wenn man den ständig neu erzeugt und wieder freigibt ist das auch ziemlich langsam.Lösung: ein thread-static EVENT der erzeugt wird wenn er das erste mal gebraucht wird, und erst freigegeben wenn der Thread terminiert.
Analog könnte man hier eine thread-staticcondition_variable
verwenden.
(Genaugenommen würde auch hier ein thread-static EVENT reichen, nur das ist dann natürlich wieder nicht plattformübergreifend.)Bzw. wenn man wirklich nur wenige Prioritätsklassen braucht, und innerhalb der Prioritätsklassen kein FIFO Verhalten, dann sollte es auch mit einer
condition_variable
pro Prioritätsklasse gehen. Dann muss man sich nicht mit thread-static rumschlagen.ps: Wozu die
work_mutex
in deinem Code gut ist verstehe ich auch nicht ganz. Eine einzige Mutex die die Queue schützt ist vollkommen ausreichend:
return p == queue.top().value && worker.try_lock();
=>
return queue.top().cv == &cv
EDIT: OK, nein, so einfach ist es dann doch nicht. Weil eben ein zweiteslock()
zwischenunlock()
und dem Re-lock der internen Mutex imwait()
reinfahren kann. /EDITps2: Wenn man davon ausgeht dass es spurious Wakeups geben kann, dann ist dein Code sogar falsch
Weil er nicht prüft ob der aktuelle Thread wirklichtop
ist. Wenn ein Thread mit der selben Priorität wie der top Thread fälschlicherweise gleichzeitig mit dem top Thread aufgeweckt wird, dann könnte der falsche Thread beitry_lock()
Erfolg haben. Danach wäre dann ein Eintrag mit einem Zeiger auf eine bereits zerstörte CV in der Queue, und beim nächstenunlock
würde es knallen.Und wenn man davon ausgeht dass es keine spurious Wakeups geben kann, dann ist die ganze Bedingung im
wait
überflüssig.
-
hustbaer schrieb:
@logga
Deine Lösung hat dafür einen relativ grossen fixen Overhead, nämlich das Erzeugen + Löschen der lokalencondition_variable
.
(Wie gross der Overhead wirklich ist hängt natürlich von der Implementierung der Standard-Library ab, aber ich gehe davon aus dass es Implementierungen gibt wo der Overhead einigermassen gross ist.)Dass es einen riesigen Overhead hat, ist mir klar. Man muss sich halt entscheiden zwischen dem algorithmisch suboptimalen Code von Arcoth oder meinem mit einer grossen Konstante. Was für den jeweiligen Einsatzzweck besser ist, muss man selber rausfinden.
C++11 gibt einem halt wenig Mittel um so etwas performant zu implementieren. Deine Idee mit der per-Thread
condition_variable
macht es etwas besser und mit plattformabhängigen Konstrukten kriegt man das noch besser hin. Das war aber nicht mein Ziel.ps2: Wenn man davon ausgeht dass es spurious Wakeups geben kann, dann ist dein Code sogar falsch
Weil er nicht prüft ob der aktuelle Thread wirklichtop
ist. Wenn ein Thread mit der selben Priorität wie der top Thread fälschlicherweise gleichzeitig mit dem top Thread aufgeweckt wird, dann könnte der falsche Thread beitry_lock()
Erfolg haben. Danach wäre dann ein Eintrag mit einem Zeiger auf eine bereits zerstörte CV in der Queue, und beim nächstenunlock
würde es knallen.Ja, der Vergleich auf top sollte ein Vergleich auf
&cv
sein. Der Code ist sogar ohne spurious Wakeups falsch:
lock(2), lock(1a), unlock(2) wird vor dem notify unterbrochen, lock(1b) (poppt fälschlicherweise 1a weil 1a==1b), notify von unlock(2) => Zugriff auf das nicht mehr vorhandene cv von 1b!
-
Ich fände als Erweiterung der Standard Library praktisch: ein Teil das sich wie eine Condition-Variable verhält, allerdings gebunden an einen Thread (=also dass immer nur der "Besitzer-Thread" darauf warten kann - demendsprechend gibt es dann auch kein
notify_all
).
Und jeder Thread sollte so ein Ding dann per Default haben (bzw. darf es auch gerne beim 1. Aufruf der Accessor-Funktion erzeugt werden).
Weiters sollte dieses Ding einen "head" für eine intrusive double linked list haben. (EDIT: OK, das wäre nicht wirklich nötig, die drei Zeiger könnte man auch genau so gut vom Stack nehmen, und die eine zusätzliche Indirektion die man dadurch bekommt kann man vermutlich verkraften. /EDIT)So ein Ding liesse sich einerseits verwenden um damit diverse Standard-Library Klassen zu implementieren (z.B.
std::condition_variable
auf Plattformen wo es keine native CVs gibt). Und andrerseits natürlich auch um eigene synchronization primitives zu implementieren wie die hier diskutiertepriority_mutex
.
Eine einfache Mutex mit FIFO Garantie wäre damit z.B. auch in ein paar wenigen Zeilen relativ performant implementierbar.Bzw. würde damit überall dort wo man mit dieser eingeschränkten Variante auskommt der u.U. deutlich höhere Overhead einer
std::condition_variable
entfallen.Ich habe vor vielen Jahren mal angefangen eine eigene Threading-Library zu implementieren (ist dann natürlich nie fertig geworden :D). Dort war genau so ein Ding enthalten, eben weil ich es für die Implementierung der CV benötigt habe.