BlockingQueue in C++ : Implementierungsprobleme
-
Schönen Abend!
Ich implementiere gerade eine BlockingQueue, die ich als Basis für einen ThreadPool verwenden möchte. Aber zunächst der Quellcode:template<typename ValueT, typename QueueT = std::queue<ValueT>> class BlockingQueue : boost::noncopyable { private: QueueT queue; std::mutex mutex; std::condition_variable insertCondition; std::condition_variable eraseCondition; public: ~BlockingQueue() { insertCondition.notify_all(); eraseCondition.notify_all(); } void enqueue(ValueT const& value) { { std::lock_guard<std::mutex> guard(mutex); queue.push(value); } insertCondition.notify_one(); } void emplace(ValueT&& value) { { std::lock_guard<std::mutex> guard(mutex); queue.emplace(std::move(value)); } insertCondition.notify_one(); } boost::optional<ValueT> dequeue() { boost::optional<ValueT> result; { std::unique_lock<std::mutex> lock(mutex); insertCondition.wait(lock); if (queue.empty()) { return boost::none; } boost::optional<ValueT> result = std::move(queue.front()); queue.pop(); } eraseCondition.notify_one(); return result; } void waitUntilIsEmpty() { std::unique_lock<std::mutex> lock(mutex); if (queue.empty()) { return; } eraseCondition.wait(lock, [&]() { return queue.empty(); }); } void shutdown() { { std::lock_guard<std::mutex> guard(mutex); QueueT empty; queue.swap(empty); } insertCondition.notify_all(); } };
Jetzt habe ich folgende Anliegen:
1. Die Queue wird befüllt und Threads ackern sofort daran, sie zu leeren. Nachdem ich die Threads erzeugt habe, möchte ich warten, bis alle Tasks in der Queue abgearbeitet sind. Allerdings funktioniert meine Implementierung von waitUntilIsEmpty() nicht, wo ist mein Denkfehler? Die Funktion terminiert nie.
2. Fällt euch ein Fehler in shutdown() auf? Ich möchte, dass alle Threads keine weiteren Aufgaben mehr abarbeiten können, wenn der ThreadPool zerstört wird, deshalb möchte ich in shutdown() die Queue leeren, damit kein Thread mehr neue Aufgaben bekommt.
3. Sonstige Fehler, die ins Auge springen?
Vielen Dank & schöne Grüße
-
Statt
eraseCondition.notify_one();
gehört danotify_all
hin. Daran liegt's vermutlich nicht (es sei denn du testest mit > 1 Thread inwaitUntilIsEmpty
), muss aber trotzdemnotify_all
hin.Dann fehlt in
shutdown
eineraseCondition.notify_all();
. Du willst ja nicht nurdequeue
-Aufrufer aufwecken sondern auchwaitUntilIsEmpty
-Aufrufer. Bzw. du musst, weil diewaitUntilIsEmpty
-Aufrufer sonst nie mehr geweckt würden. Die Queue ist dann ja leer, damit wird nix mehr rausgenommen, dadurch ruft auch kein anderer mehreraseCondition.notify_all();
auf.Und in
dequeue
dasinsertCondition.wait(lock);
ohne vorherigen Test ist Quatsch.
Das Prinzip ist immer:- Mutex locken
while (!condition) wait();
D.h. auch KEINwait()
wenn schon "condition". Sonst kannst du nämlich u.U. lange (ewig) warten.
In
waitUntilIsEmpty
hast du es ja z.B. auch korrekt gemacht.Und dann kannst du noch das
if (queue.empty()) return;
inwaitUntilIsEmpty
wegmachen. Das ist überflüssig.wait(lock, predicate)
überprüft daspredicate
sowieso BEVORwait(lock)
aufgerufen wird.
-
Vielen Dank.
Hier ist die fertige Version, bei der ich auch mit exzessivem Threadgeballere keine Probleme finden konnte, falls es jemanden interessiert:
#pragma once // C++ Standard Library #include <queue> #include <mutex> #include <condition_variable> #include <exception> // Boost Libraries: #include <boost/optional.hpp> #include <boost/noncopyable.hpp> namespace threading { template<typename ValueT, typename QueueT = std::queue<ValueT>> class BlockingQueue : boost::noncopyable { private: QueueT queue; std::mutex queueMutex; std::condition_variable insertCondition; std::condition_variable eraseCondition; bool isShutDown; public: BlockingQueue() : queue(), queueMutex(), insertCondition(), eraseCondition(), isShutDown(false) { } ~BlockingQueue() { insertCondition.notify_all(); eraseCondition.notify_all(); } void enqueue(ValueT const& value) { { std::lock_guard<std::mutex> guard(queueMutex); if (isShutDown) { throw std::logic_error("BlockingQueue::enqueue : Queue is shut down"); } queue.push(value); } insertCondition.notify_one(); } void emplace(ValueT&& value) { { std::lock_guard<std::mutex> guard(queueMutex); if (isShutDown) { throw std::logic_error("BlockingQueue::emplace : Queue is shut down"); } queue.emplace(std::move(value)); } insertCondition.notify_one(); } boost::optional<ValueT> dequeue() { boost::optional<ValueT> result; { std::unique_lock<std::mutex> lock(queueMutex); if (isShutDown) { return boost::none; } insertCondition.wait(lock, [&]() { return !queue.empty() || isShutDown; }); if (queue.empty()) { return boost::none; } result = std::move(queue.front()); queue.pop(); } eraseCondition.notify_all(); return result; } void waitUntilIsEmpty() { std::unique_lock<std::mutex> lock(queueMutex); eraseCondition.wait(lock, [&]() { return queue.empty(); }); } void shutdown() { { std::lock_guard<std::mutex> guard(queueMutex); QueueT empty; queue.swap(empty); isShutDown = true; } insertCondition.notify_all(); eraseCondition.notify_all(); } }; }
-
Bitte
Sieht nicht ganz schlecht aus.Eine Sache bloss...
Und zwar der Destruktor.
Der ist weder nötig noch wäre hinreichend um irgend ein Problem zu lösen.Also entweder du hast ein Problem, dann retten dich die beiden
notify_all
auch nicht. Oder du hast kein Problem, dann sind sie nicht nötig.Erklärung: Du darfst das Objekt (=deine Queue) sowieso erst zerstören, wenn es nicht mehr verwendet wird.
Fall 1: Das Objekt wird zu dem Zeitpunkt wo der Destruktor aufgerufen wird noch verwendet. Dann reichen die beiden
notify_all
nicht um sicherzustellen dass es nicht mehr verwendet wird.Angenommen es ist noch ein Thread in
enqueue
oderdequeue
während der Destruktor aufgerufen wird. Dann machst du zwarnotify_all
um den zu wecken, was oberflächlich betrachtet vielleicht nach einer guten Idee aussieht. Allerdings hast du keine Kontrolle darüber wann der Thread geweckt wird und wann er dieenqueue
bzw.dequeue
Funktion verlassen hat.
Das ist der einfach zu erklärende Teil.Das Problem ist aber noch ... (suche passendes Wort) ... viel "allgemeiner". Denn wenn du nicht durch externe (=nicht in deinem Objekt verankerte) Mechanismen sicherstellen kannst dass das Objekt nicht mehr verwendet wird (und auch später nicht mehr verwendet werden kann), dann kannst du im Destruktor *nichts* dagegen tun. Also egal was du da an Code reinschreibst, es kann dich nicht retten.
(OK, es lassen sich Beispiele konstruieren wo man im Destruktor schon was dagegen tun kann. Allerdings nur wenn das verwendete Objekt über seine Verwender bescheid weiss. Was bei einer Sache wie einer Queue sicher nicht "richtig" ist. Die Queue sollte nicht wissen müssen wer sie alles verwendet. Das wäre total "verkehrt herum".)Fall 2: Das Objekt wird zu dem Zeitpunkt wo der Destruktor aufgerufen wird nicht mehr verwendet. Dann sind die beiden
notify_all
schlichtweg für nichts, weil sie per Definition keinen mehr wecken können.