Threadpool mit abbrechbaren Jobs und then-Funktion



  • Hi,

    ich habe basierend auf boost::asio einen einfachen Threadpool aufgebaut, bei dem Jobs abbrechbar sein sollen. Basierend auf einer Empfehlung von Finnegan ( https://www.c-plusplus.net/forum/p2506141#2506141 ) nutze ich dazu ein thread-safe abort-Flag, das ich über shared_ptr<atomic<bool>> implementiere.

    Eine zusätzliche Anforderung an meinen Threadpool sollte jedoch sein, dass beim Fertigstellen eines Jobs eine then-Funktion aufgerufen wird. Diese erhält als Parameter ein Handle (weil dieselbe then-Funktion mit mehreren Jobs aufgerufen werden kann und ich über das Handle auch als Identifier dient) und den Rückgabewert der eigentlichen Job-Funktion. Zurzeit muss es ein Result geben und man muss eine then-Funktion angeben, das ist für mich aber okay.

    Wenn ich mehrere Jobs hinzufüge, welche dieselbe then-Funktion nutzen, kann ich über eine Methode des Handles vergleichen, ob die übereinstimmen. Das mache ich, indem ich prüfe, ob die Zeiger übereinstimmen. Sollte der Job bereits aborted worden sein, returned diese Methode ebenfalls false, da ich das Ergebnis in dem Fall ignoriere.

    Hier ist der (kompilierbare Code):

    #include <boost/asio.hpp>
    #include <boost/thread.hpp>
    #include <memory>
    #include <atomic>
    
    class Handle
    {
    public:
    	using SharedPtrBool = std::shared_ptr<std::atomic<bool>>;
    
    	Handle() : abort(new std::atomic<bool>(false)) {}
    
    	// checks whether handles are equal and whether both are not aborted
    	bool EqualAndUnaborted(const Handle& rhs) const
    	{
    		// any abort flag equal to true?
    		if (*abort)
    			return false;
    
    		// do pointers equal?
    		return abort.get() == rhs.abort.get();
    	}
    
    	// aborts thread
    	void Abort()
    	{
    		*abort = true;
    	}
    
    	// get identifier
    	int GetIdentifier() const
    	{
    		static_assert(sizeof(int) == sizeof(decltype(abort.get())), "identifier type (int) size is not equal to pointer size");
    		return reinterpret_cast<int>(abort.get());
    	}
    
    private:
    	SharedPtrBool abort;
    
    	friend class ThreadPool;
    };
    
    class ThreadPool
    {
    public:
    	using SharedPtrBool = Handle::SharedPtrBool;
    
    	ThreadPool(int threads)
    		: work(ioService)
    	{
    		for (int n = 0; n < threads; ++n)
    			threadPool.create_thread(boost::bind(&boost::asio::io_service::run, &ioService));
    	}
    
    	~ThreadPool()
    	{
    		ioService.stop();
    		threadPool.join_all();
    	}
    
    	template<typename T, typename U>
    	Handle AddJob(T func, U then)
    	{
    		std::cout << "Job added" << std::endl;
    		Handle h;
    		ioService.post([h, func, then](){
    			then(h, func(h.abort));
    		});
    
    		return h;
    	}
    
    	ThreadPool(const ThreadPool&) = delete;
    	ThreadPool& operator=(const ThreadPool&) = delete;
    	ThreadPool(ThreadPool&&) = delete;
    	ThreadPool& operator=(ThreadPool&&) = delete;
    
    private:
    	boost::asio::io_service ioService;
    	boost::thread_group threadPool;
    	boost::asio::io_service::work work;
    };
    

    Folgendes Beispielprogramm funktioniert auch, wenn man es direkt unter obigen Code hängt:

    Handle h1, h2, h3;
    
    void done(Handle h, int r)
    {
    	if (h.EqualAndUnaborted(h1))
    	{
    		std::cout << "First with " << r << std::endl;
    	}
    	else if (h.EqualAndUnaborted(h2))
    	{
    		std::cout << "Second with " << r << std::endl;
    	}
    	else if (h.EqualAndUnaborted(h3))
    	{
    		std::cout << "Third with " << r << std::endl;
    	}
    }
    
    int main()
    {
    	ThreadPool pool(4);
    
    	h1 = pool.AddJob([](ThreadPool::SharedPtrBool abort){
    		int m = 0;
    		for (int n = 0; n < 999999999 && !*abort; ++n)
    			if (n % 1500 == 0)
    				++m;
    		return m;
    	}, done);
    
    	h2 = pool.AddJob([](ThreadPool::SharedPtrBool abort){
    		int m = 0;
    		for (int n = 0; n < 9999999 && !*abort; ++n)
    			if (n % 2000 == 0)
    				++m;
    		return m;
    	}, done);
    
    	h3 = pool.AddJob([](ThreadPool::SharedPtrBool abort){
    		int m = 0;
    		for (int n = 0; n < 9999999 && !*abort; ++n)
    			if (n % 1900 == 0)
    				++m;
    		return m;
    	}, done);
    
    	boost::this_thread::sleep_for(boost::chrono::milliseconds(1500));
    
    	h1.Abort();
    
    	std::cout << "aborted" << std::endl;
    
    	boost::this_thread::sleep_for(boost::chrono::milliseconds(1500));
    }
    

    Meine Frage ist jetzt: Habe ich irgendwas übersehen, was mir die Threadsafety kaputt machen könnte oder zu anderen Problemen führen kann?



  • ist ein shared_ptr<atomic<bool>> nicht overkill?

    atomic<bool> ist doch schon dazu gedacht geteilt zu werden, warum noch mehrere atomic int counter drumrum machen?



  • Das eine ersetzt das andere ja nicht.

    Denn ohne shared_ptr könnte ich ja keine Handles kopieren. Aber an wessen Scope könnte ich denn dann den bool binden? Wenn der Job bestimmt, wann der atomic<bool> stirbt, kann es sein, dass ich bei einem Abort aus dem Hauptthread ein bool auf false setze, welches nicht mehr existiert. Und derjenige, der den Job hinzufügt, hat ja keine Verpflichtung das Handle bis in alle Ewigkeit zu behalten. Der then-Slot lebt im Thread des Threadpools, nicht im Hauptthread. Ich bräuchte also einen zusätzlichen Mutex ohne shared_ptr.



  • Eisflamme schrieb:

    Das eine ersetzt das andere ja nicht.

    Denn ohne shared_ptr könnte ich ja keine Handles kopieren. Aber an wessen Scope könnte ich denn dann den bool binden? Wenn der Job bestimmt, wann der atomic<bool> stirbt, kann es sein, dass ich bei einem Abort aus dem Hauptthread ein bool auf false setze, welches nicht mehr existiert. Und derjenige, der den Job hinzufügt, hat ja keine Verpflichtung das Handle bis in alle Ewigkeit zu behalten. Der then-Slot lebt im Thread des Threadpools, nicht im Hauptthread. Ich bräuchte also einen zusätzlichen Mutex ohne shared_ptr.

    wenn du meinst einenen atomic boolean mit einem Mutex beschützen zu müssen, dann OK.



  • Hast meinen Post nicht verstanden.

    Falls jemand meint, der Code sei so fehlerfrei, freue ich mich über diese Art von Feedback natürlich ebenfalls.


Anmelden zum Antworten