BlockingQueue in C++ : Implementierungsprobleme



  • Schönen Abend!
    Ich implementiere gerade eine BlockingQueue, die ich als Basis für einen ThreadPool verwenden möchte. Aber zunächst der Quellcode:

    template<typename ValueT, typename QueueT = std::queue<ValueT>>
    	class BlockingQueue : boost::noncopyable {
    	private:
    		QueueT queue;
    		std::mutex mutex;
    		std::condition_variable insertCondition;
    		std::condition_variable eraseCondition;
    
    	public:
    		~BlockingQueue() {
    			insertCondition.notify_all();
    			eraseCondition.notify_all();
    		}
    
    		void enqueue(ValueT const& value) {
    			{
    				std::lock_guard<std::mutex> guard(mutex);
    				queue.push(value);
    			}
    			insertCondition.notify_one();
    		}
    
    		void emplace(ValueT&& value) {
    			{
    				std::lock_guard<std::mutex> guard(mutex);
    				queue.emplace(std::move(value));
    			}
    			insertCondition.notify_one();
    		}
    
    		boost::optional<ValueT> dequeue() {
    			boost::optional<ValueT> result;
    			{
    				std::unique_lock<std::mutex> lock(mutex);
    				insertCondition.wait(lock);
    
    				if (queue.empty()) {
    					return boost::none;
    				}
    
    				boost::optional<ValueT> result = std::move(queue.front());
    				queue.pop();
    			}
    			eraseCondition.notify_one();
    			return result;
    		}
    
    		void waitUntilIsEmpty() {
    			std::unique_lock<std::mutex> lock(mutex);
    			if (queue.empty()) {
    				return;
    			}
    			eraseCondition.wait(lock, [&]() { return queue.empty(); });
    		}
    
    		void shutdown() {
    			{
    				std::lock_guard<std::mutex> guard(mutex);
    				QueueT empty;
    				queue.swap(empty);
    			}
    			insertCondition.notify_all();
    		}
    	};
    

    Jetzt habe ich folgende Anliegen:

    1. Die Queue wird befüllt und Threads ackern sofort daran, sie zu leeren. Nachdem ich die Threads erzeugt habe, möchte ich warten, bis alle Tasks in der Queue abgearbeitet sind. Allerdings funktioniert meine Implementierung von waitUntilIsEmpty() nicht, wo ist mein Denkfehler? Die Funktion terminiert nie.

    2. Fällt euch ein Fehler in shutdown() auf? Ich möchte, dass alle Threads keine weiteren Aufgaben mehr abarbeiten können, wenn der ThreadPool zerstört wird, deshalb möchte ich in shutdown() die Queue leeren, damit kein Thread mehr neue Aufgaben bekommt.

    3. Sonstige Fehler, die ins Auge springen?

    Vielen Dank & schöne Grüße



  • Statt eraseCondition.notify_one(); gehört da notify_all hin. Daran liegt's vermutlich nicht (es sei denn du testest mit > 1 Thread in waitUntilIsEmpty ), muss aber trotzdem notify_all hin.

    Dann fehlt in shutdown ein eraseCondition.notify_all(); . Du willst ja nicht nur dequeue -Aufrufer aufwecken sondern auch waitUntilIsEmpty -Aufrufer. Bzw. du musst, weil die waitUntilIsEmpty -Aufrufer sonst nie mehr geweckt würden. Die Queue ist dann ja leer, damit wird nix mehr rausgenommen, dadurch ruft auch kein anderer mehr eraseCondition.notify_all(); auf.

    Und in dequeue das insertCondition.wait(lock); ohne vorherigen Test ist Quatsch.
    Das Prinzip ist immer:

    1. Mutex locken
    2. while (!condition) wait();
      D.h. auch KEIN wait() wenn schon "condition". Sonst kannst du nämlich u.U. lange (ewig) warten.

    In waitUntilIsEmpty hast du es ja z.B. auch korrekt gemacht.

    Und dann kannst du noch das if (queue.empty()) return; in waitUntilIsEmpty wegmachen. Das ist überflüssig. wait(lock, predicate) überprüft das predicate sowieso BEVOR wait(lock) aufgerufen wird.



  • Vielen Dank. 🙂

    Hier ist die fertige Version, bei der ich auch mit exzessivem Threadgeballere keine Probleme finden konnte, falls es jemanden interessiert:

    #pragma once
    
    // C++ Standard Library
    #include <queue>
    #include <mutex>
    #include <condition_variable>
    #include <exception>
    
    // Boost Libraries:
    #include <boost/optional.hpp>
    #include <boost/noncopyable.hpp>
    
    namespace threading {
    
    	template<typename ValueT, typename QueueT = std::queue<ValueT>>
    	class BlockingQueue : boost::noncopyable {
    	private:
    		QueueT queue;
    		std::mutex queueMutex;
    		std::condition_variable insertCondition;
    		std::condition_variable eraseCondition;
    		bool isShutDown;
    
    	public:
    		BlockingQueue()
    			: queue(), queueMutex(), insertCondition(), eraseCondition(), isShutDown(false)
    		{ }
    
    		~BlockingQueue() {
    			insertCondition.notify_all();
    			eraseCondition.notify_all();
    		}
    
    		void enqueue(ValueT const& value) {
    			{
    				std::lock_guard<std::mutex> guard(queueMutex);
    				if (isShutDown) {
    					throw std::logic_error("BlockingQueue::enqueue : Queue is shut down");
    				}
    				queue.push(value);
    			}
    			insertCondition.notify_one();
    		}
    
    		void emplace(ValueT&& value) {
    			{
    				std::lock_guard<std::mutex> guard(queueMutex);
    				if (isShutDown) {
    					throw std::logic_error("BlockingQueue::emplace : Queue is shut down");
    				}
    				queue.emplace(std::move(value));
    			}
    			insertCondition.notify_one();
    		}
    
    		boost::optional<ValueT> dequeue() {
    			boost::optional<ValueT> result;
    			{
    				std::unique_lock<std::mutex> lock(queueMutex);
    				if (isShutDown) {
    					return boost::none;
    				}
    
    				insertCondition.wait(lock, [&]() { return !queue.empty() || isShutDown; });
    				if (queue.empty()) {
    					return boost::none;
    				}
    
    				result = std::move(queue.front());
    				queue.pop();
    			}
    			eraseCondition.notify_all();
    			return result;
    		}
    
    		void waitUntilIsEmpty() {
    			std::unique_lock<std::mutex> lock(queueMutex);
    			eraseCondition.wait(lock, [&]() { return queue.empty(); });
    		}
    
    		void shutdown() {
    			{
    				std::lock_guard<std::mutex> guard(queueMutex);
    				QueueT empty;
    				queue.swap(empty);
    				isShutDown = true;
    			}
    			insertCondition.notify_all();
    			eraseCondition.notify_all();
    		}
    	};
    }
    


  • Bitte 🙂
    Sieht nicht ganz schlecht aus.

    Eine Sache bloss...
    Und zwar der Destruktor.
    Der ist weder nötig noch wäre hinreichend um irgend ein Problem zu lösen.

    Also entweder du hast ein Problem, dann retten dich die beiden notify_all auch nicht. Oder du hast kein Problem, dann sind sie nicht nötig.

    Erklärung: Du darfst das Objekt (=deine Queue) sowieso erst zerstören, wenn es nicht mehr verwendet wird.

    Fall 1: Das Objekt wird zu dem Zeitpunkt wo der Destruktor aufgerufen wird noch verwendet. Dann reichen die beiden notify_all nicht um sicherzustellen dass es nicht mehr verwendet wird.

    Angenommen es ist noch ein Thread in enqueue oder dequeue während der Destruktor aufgerufen wird. Dann machst du zwar notify_all um den zu wecken, was oberflächlich betrachtet vielleicht nach einer guten Idee aussieht. Allerdings hast du keine Kontrolle darüber wann der Thread geweckt wird und wann er die enqueue bzw. dequeue Funktion verlassen hat.
    Das ist der einfach zu erklärende Teil.

    Das Problem ist aber noch ... (suche passendes Wort) ... viel "allgemeiner". Denn wenn du nicht durch externe (=nicht in deinem Objekt verankerte) Mechanismen sicherstellen kannst dass das Objekt nicht mehr verwendet wird (und auch später nicht mehr verwendet werden kann), dann kannst du im Destruktor *nichts* dagegen tun. Also egal was du da an Code reinschreibst, es kann dich nicht retten.
    (OK, es lassen sich Beispiele konstruieren wo man im Destruktor schon was dagegen tun kann. Allerdings nur wenn das verwendete Objekt über seine Verwender bescheid weiss. Was bei einer Sache wie einer Queue sicher nicht "richtig" ist. Die Queue sollte nicht wissen müssen wer sie alles verwendet. Das wäre total "verkehrt herum".)

    Fall 2: Das Objekt wird zu dem Zeitpunkt wo der Destruktor aufgerufen wird nicht mehr verwendet. Dann sind die beiden notify_all schlichtweg für nichts, weil sie per Definition keinen mehr wecken können.


Log in to reply