std::mutex mit Priorität?



  • Ich brauche eigentlich nur 2 Level: LOW und HIGH. Wobei es immer nur einen HIGH thread geben wird und wenige LOW (aktuell einen, später eventuell 2 bis 3). Von denen ist die Priorität dann aber egal, nur mein einer HIGH thread soll Vorrang haben.



  • Bei nur 2 Threads insgesamt reicht ein einfacher Mutex.
    Ab 3 Low-priority-Threads würde ich die verlinkte 3-Thread-Lösung nehmen. Die funktioniert (spiel halt mal alle Fälle durch) und benötigt garantiert konstante Zeit.



  • *3-Mutex-Lösung


  • Mod

    Eine allgemeine Lösung kann mittels einer priority_queue geschrieben werden. Statt vector sollte man boost::container::static_vector als den von queue verwendeten Container festlegen, falls möglich.

    Ein grobes Beispiel:

    class priority_mutex : std::mutex
    {
    	std::priority_queue<std::size_t> queue;
    	std::mutex qmutex;
    
    public:
    
    	using std::mutex::native_handle_type;
    	using std::mutex::native_handle;
    	using std::mutex::unlock;
    	using std::mutex::try_lock;
    
    	void lock(std::size_t p) {
    		if (!try_lock())
    		{
    			qmutex.lock();
    			queue.push(p);
    			qmutex.unlock();
    
    			while (queue.top() != p || !try_lock())
    				std::this_thread::sleep_for(std::chrono::milliseconds(5));
    
    			qmutex.lock();
    			queue.pop();
    			qmutex.unlock();
    		}
    	}
    };
    

    Demo gibt's hier.



  • Arcoth schrieb:

    while (queue.top() != p || !try_lock())
                    std::this_thread::sleep_for(std::chrono::milliseconds(5));
    

    Bitte kein Busy-Waiting. Davon abgesehen skaliert es nicht. Das ist ein gutes Beispiel, wie man es nicht machen sollte.



  • Hast du einen besseren Vorschlag?


  • Mod

    logga schrieb:

    Arcoth schrieb:

    while (queue.top() != p || !try_lock())
                    std::this_thread::sleep_for(std::chrono::milliseconds(5));
    

    Bitte kein Busy-Waiting. Davon abgesehen skaliert es nicht. Das ist ein gutes Beispiel, wie man es nicht machen sollte.

    Jo, ist mir auch klar geworden. Ich schreibs um.



  • Mein Vorschlag:

    class two_priority_mutex {
      std::mutex low_priority_mutex, next_access_mutex, data_mutex;
    public:
      void lock() {
        std::unique_lock<std::mutex> low(low_priority_mutex);
        std::lock_guard<std::mutex> guard(next_access_mutex);
        data_mutex.lock();
        low.release();
      }
      void unlock() noexcept {
        data_mutex.unlock();
        low_priority_mutex.unlock();
      }
      void high_priority_lock() {
          std::lock_guard<std::mutex> guard(next_access_mutex);
          data_mutex.lock();
      }
      void high_priority_unlock() noexcept {
          data_mutex.unlock();
      }
      class high {
        two_priority_mutex *me;
      public:
        high(two_priority_mutex *me) : me(me) {}
        void lock() { me->high_priority_lock(); }
        void unlock() noexcept { me->high_priority_unlock(); }
      };
      high high_priority_locker() {
        return high(this);
      }
    };
    
    two_priority_mutex m;
    
    // High-Priority-Thread
    {
      auto h = m.high_priority_locker();
      std::lock_guard<two_priority_mutex::high> g(h);
      ...
    }
    
    // Low-Priority-Threads
    {
      std::lock_guard<two_priority_mutex> g(m);
      ...
    }
    

  • Mod

    Ich hab es mit einer cv umgeschrieben:

    class priority_mutex : std::mutex
    {
    	std::priority_queue<std::size_t> queue;
    	std::mutex qmutex;
    	std::condition_variable cv;
    
    public:
    
    	using std::mutex::native_handle_type;
    	using std::mutex::native_handle;
    	using std::mutex::try_lock;
    
    	void lock(std::size_t p) {
    		if (!try_lock())
    		{
    			std::unique_lock<std::mutex> ulock(qmutex);
    			queue.push(p);
    
    			cv.wait(ulock, [this, p] {return queue.top() == p && try_lock();});
    
    			queue.pop();
    			qmutex.unlock();
    			cv.notify_all();
    		}
    	}
    
    	void unlock() {
    		std::mutex::unlock();
    		cv.notify_all();
    	}
    };
    

    Demo. Kann aber natürlich wieder Unsinn sein 😃



  • Wow, vielen Dank auf jeden Fall euch beiden, werde ich mir gleich mal genauer anschauen 👍



  • Arcoth schrieb:

    Ich hab es mit einer cv umgeschrieben:

    Auf den ersten Blick finde ich das eine schicke Lösung 😃
    Allerdings behagt mir dieser Prioritäts-Mutex trotzdem irgendwie nicht so ganz:
    Ich will nicht ausschließen, dass er irgendwo mal Sinn macht, aber mein Bauchgefühl sagt mir, dass die Prioritätswarteschlange überhaupt erst dann ihre
    Wirkung entfaltet, wenn man eigentlich ein anderes Problem hat: Zu viel Contention auf der Ressource, die von dem Mutex geschützt wird.

    Das mag jetzt eine Binsenweisheit sein, und das Problem ist oft nicht einfach zu lösen.
    Eine Technink als Anregung: bei keineren bis mittelgroßen Objekten mit viel Contention hat sich bei mir des öfteren z.B. ein COW-Ansatz bewährt.
    Trotz der zusätzlichen Kopie hat sich dabei nicht selten der Gesamtdurchsatz deutlich erhöht, da sich die Threads wesentlich weniger auf gegenseitig auf den Füßen stehen:

    auto shared = std::make_shared<const Resource>();
    
    thread1:
        auto res = std::atomic_load(&shared);
        res->readOnly();
    
    thread2:
        auto res = std::atomic_load(&shared);
        auto mut = std::make_shared<Resource>(*res));
        mut->mutate();
        std::atomic_store(&shared, std::static_pointer_cast<const Resource>(mut));
    

    Das ist natürlich nur eine mögliche Variante, wie man auf den Mutex verzichten kann, und sie ist natürlich nicht immer sinnvoll.
    Gut möglich dass der Prioritäts-Mutex schon eine der besseren Lösungen für dein spezielles Problem ist.

    Finnegan



  • Arcoth schrieb:

    Demo. Kann aber natürlich wieder Unsinn sein 😃

    Mein Hauptkritikpunkt ist immer noch da. Wenn du 1000 Threads am Warten hast, musst du jedesmal immer alle aufwecken. Bei n Locks hat jeder Thread O(n) Overhead, total macht das O(n2) Laufzeit.

    Hier meine Umsetzung mit O(1) Overhead und O(n log n) Laufzeit (optimal, da die Priority-Queue so lange braucht):

    template <typename T>
    class priority_mutex : std::mutex {
      struct entry {
        entry(T value, std::condition_variable *cv) : value(std::move(value)), cv(cv) {}
        T value;
        std::condition_variable *cv;
        friend bool operator<(entry const& a, entry const& b) { return a.value < b.value; }
      };
      std::priority_queue<entry> queue;
      std::mutex queue_mutex;
      std::mutex work_mutex;
    
    public: 
      void lock(T p) {
        std::unique_lock<std::mutex> guard(queue_mutex);
        std::condition_variable cv;
        queue.emplace(p, &cv);
        std::unique_lock<std::mutex> worker(work_mutex, std::defer_lock);
        cv.wait(guard, [&]{ return p == queue.top().value && worker.try_lock(); });
        queue.pop();
        worker.release();
      }
      void unlock() noexcept {
        work_mutex.unlock();
        std::lock_guard<std::mutex> guard(queue_mutex);
        if (!queue.empty())
          queue.top().cv->notify_one();
      }
    };
    

    Der am höchsten prioritäre Thread wird gezielt aufgeweckt. Es kann zwar passieren, dass zwischen unlock() und dem Aufwecken der condition variable ein neues Top-Element eingefügt wird, aber das startet dann sofort.



  • @logga
    Deine Lösung hat dafür einen relativ grossen fixen Overhead, nämlich das Erzeugen + Löschen der lokalen condition_variable .
    (Wie gross der Overhead wirklich ist hängt natürlich von der Implementierung der Standard-Library ab, aber ich gehe davon aus dass es Implementierungen gibt wo der Overhead einigermassen gross ist.)

    Ist im Prinzip das selbe Problem das man auch hat wenn man versucht condition_variable mit nur Win32 API zu implementieren (mit Windows XP, bzw. halt mit Windows Versionen die noch keine "native" Condition-Variablen haben). Da braucht man statt der per-Thread condition_variable einen per-Thread EVENT. Wenn man den ständig neu erzeugt und wieder freigibt ist das auch ziemlich langsam.

    Lösung: ein thread-static EVENT der erzeugt wird wenn er das erste mal gebraucht wird, und erst freigegeben wenn der Thread terminiert.
    Analog könnte man hier eine thread-static condition_variable verwenden.
    (Genaugenommen würde auch hier ein thread-static EVENT reichen, nur das ist dann natürlich wieder nicht plattformübergreifend.)

    Bzw. wenn man wirklich nur wenige Prioritätsklassen braucht, und innerhalb der Prioritätsklassen kein FIFO Verhalten, dann sollte es auch mit einer condition_variable pro Prioritätsklasse gehen. Dann muss man sich nicht mit thread-static rumschlagen.

    ps: Wozu die work_mutex in deinem Code gut ist verstehe ich auch nicht ganz. Eine einzige Mutex die die Queue schützt ist vollkommen ausreichend:
    return p == queue.top().value && worker.try_lock();
    =>
    return queue.top().cv == &cv
    EDIT: OK, nein, so einfach ist es dann doch nicht. Weil eben ein zweites lock() zwischen unlock() und dem Re-lock der internen Mutex im wait() reinfahren kann. /EDIT

    ps2: Wenn man davon ausgeht dass es spurious Wakeups geben kann, dann ist dein Code sogar falsch 😉
    Weil er nicht prüft ob der aktuelle Thread wirklich top ist. Wenn ein Thread mit der selben Priorität wie der top Thread fälschlicherweise gleichzeitig mit dem top Thread aufgeweckt wird, dann könnte der falsche Thread bei try_lock() Erfolg haben. Danach wäre dann ein Eintrag mit einem Zeiger auf eine bereits zerstörte CV in der Queue, und beim nächsten unlock würde es knallen.

    Und wenn man davon ausgeht dass es keine spurious Wakeups geben kann, dann ist die ganze Bedingung im wait überflüssig.



  • hustbaer schrieb:

    @logga
    Deine Lösung hat dafür einen relativ grossen fixen Overhead, nämlich das Erzeugen + Löschen der lokalen condition_variable .
    (Wie gross der Overhead wirklich ist hängt natürlich von der Implementierung der Standard-Library ab, aber ich gehe davon aus dass es Implementierungen gibt wo der Overhead einigermassen gross ist.)

    Dass es einen riesigen Overhead hat, ist mir klar. Man muss sich halt entscheiden zwischen dem algorithmisch suboptimalen Code von Arcoth oder meinem mit einer grossen Konstante. Was für den jeweiligen Einsatzzweck besser ist, muss man selber rausfinden.

    C++11 gibt einem halt wenig Mittel um so etwas performant zu implementieren. Deine Idee mit der per-Thread condition_variable macht es etwas besser und mit plattformabhängigen Konstrukten kriegt man das noch besser hin. Das war aber nicht mein Ziel.

    ps2: Wenn man davon ausgeht dass es spurious Wakeups geben kann, dann ist dein Code sogar falsch 😉
    Weil er nicht prüft ob der aktuelle Thread wirklich top ist. Wenn ein Thread mit der selben Priorität wie der top Thread fälschlicherweise gleichzeitig mit dem top Thread aufgeweckt wird, dann könnte der falsche Thread bei try_lock() Erfolg haben. Danach wäre dann ein Eintrag mit einem Zeiger auf eine bereits zerstörte CV in der Queue, und beim nächsten unlock würde es knallen.

    Ja, der Vergleich auf top sollte ein Vergleich auf &cv sein. Der Code ist sogar ohne spurious Wakeups falsch:
    lock(2), lock(1a), unlock(2) wird vor dem notify unterbrochen, lock(1b) (poppt fälschlicherweise 1a weil 1a==1b), notify von unlock(2) => Zugriff auf das nicht mehr vorhandene cv von 1b!



  • Ich fände als Erweiterung der Standard Library praktisch: ein Teil das sich wie eine Condition-Variable verhält, allerdings gebunden an einen Thread (=also dass immer nur der "Besitzer-Thread" darauf warten kann - demendsprechend gibt es dann auch kein notify_all ).
    Und jeder Thread sollte so ein Ding dann per Default haben (bzw. darf es auch gerne beim 1. Aufruf der Accessor-Funktion erzeugt werden).
    Weiters sollte dieses Ding einen "head" für eine intrusive double linked list haben. (EDIT: OK, das wäre nicht wirklich nötig, die drei Zeiger könnte man auch genau so gut vom Stack nehmen, und die eine zusätzliche Indirektion die man dadurch bekommt kann man vermutlich verkraften. /EDIT)

    So ein Ding liesse sich einerseits verwenden um damit diverse Standard-Library Klassen zu implementieren (z.B. std::condition_variable auf Plattformen wo es keine native CVs gibt). Und andrerseits natürlich auch um eigene synchronization primitives zu implementieren wie die hier diskutierte priority_mutex .
    Eine einfache Mutex mit FIFO Garantie wäre damit z.B. auch in ein paar wenigen Zeilen relativ performant implementierbar.

    Bzw. würde damit überall dort wo man mit dieser eingeschränkten Variante auskommt der u.U. deutlich höhere Overhead einer std::condition_variable entfallen.

    Ich habe vor vielen Jahren mal angefangen eine eigene Threading-Library zu implementieren (ist dann natürlich nie fertig geworden :D). Dort war genau so ein Ding enthalten, eben weil ich es für die Implementierung der CV benötigt habe.


Anmelden zum Antworten