multithreaded lockfree job queue mit abbrechbaren Jobs



  • Hi,

    ich möchte eine multithreaded job queue aufsetzen, aus denen sich ein einzelner Worker-Thread bedient. Der holt sich immer einen Job auf einmal, arbeitet ihn ab und arbeitet dann mit dem nächsten.

    Meine Idee war einfach eine lockfree_queue zu verwenden. Der Hauptthread kann dann Jobs pushen, die einfach aus einem function pointer bestehen. Der Workerthrad nimmt sich Jobs heraus und arbeitet sie dann ab.

    Was ich aber zusätzlich möchte, ist die Möglichkeit Jobs abzubrechen, ob sie nun ge-queue-ed sind oder bereits in Abarbeitung sind. Wenn sie bereits laufen, möchte ich das einfach über ein Flag steuern, welches die Funktion als Parameter hat. Die Funktion prüft einfach zwischendurch, ob das Flag true (oder false, je nachdem) ist und bricht dann die Berechnung frühzeitig ab.

    Wenn der Job gequeued und abzubrechen ist, dann reicht es ja, ihn zu ignorieren, wenn er an der Reihe ist.

    Um überhaupt ein Handle zu haben, dachte ich mir, dass ich beim Hinzufügen eines Handles einfach eine JobID zurückgebe und ich damit bei der Verwaltungsklasse meiner Threads den Job über die ID aborten kann.

    Mein Problem ist jetzt: Wie organisiere ich die Verwaltung der abzubrechenden Jobs? Oder was ist best practice? Ohne Abbruch komme ich lockfree prima zurecht. Aber sagen wir, ich starte Jobs 10, 11 und 12. Jetzt läuft Job 10 erstmal und ich möchte inzwischen Job 12 und Job 11 abbrechen. Zum Merken dessen brauche ich ja wieder eine Datenstruktur. Aber zumindest boost bietet nur lockfree_queue und lockfree_stack an, die mir beide nicht helfen.

    Ich könnte zur Sammlung der abzubrechenden Job-IDs natürlich irgendetwas anderes (ein set z.B.) nutzen, doch dann bräuchte ich da ein Mutex, da ich ja gleichzeitig mein Set bei einem neuen Job durchsuchen müsste, aber eben auch reinschreibe. Wenn ich aber ein Mutex nutze, verliere ich ja ggf. die Vorteile meiner lockfree Queue.

    Gibt es eine best practice als Ausweg aus meiner Misere? Oder habt ihr sonst Ideen, wie ich das löse?

    PS: Hier etwas "pseudoisch" Code, wie ich mir die Umgebung vorgestellt habe, falls meine Beschreibung unzureichend war.

    struct Job
    {
    	int id;
    	std::function<void(bool& /* abortFlag */)> func;
    	bool abort;
    };
    
    class JobSystem
    {
    public:
    	JobSystem() : currentJob(nullptr) {}
    
    	// add job and get job id
    	int AddJob(std::function<void(bool&)> func)
    	{
    		jobQueue.push(new Job{ newJobId, std::move(func), false });
    
    		++newJobId;
    	}
    
    	// cancels job identified by id
    	void AbortJob(int id)
    	{
    		if (currentJob->id == id)
    			currentJob->abort = true;
    		else
    			jobsToAbort.push(id); // nah, jobsToAbort would need to be threadsafe, too
    	}
    
    private:
    	static int newJobId = 1;
    
    	// thread function, only one worker
    	void worker()
    	{
    		while (true)
    		{
    			while (!jobQueue.pop(currentJob));
    
    			// check thread-safe against jobsToAbort
    
    			currentJob->func(currentJob->abort);
    		}
    	}
    
    	boost::lockfree::queue<Job*> jobQueue;
    
    	Job* currentJob;
    
    	std::set<int> jobsToAbort; // set does not work, obviously...
    };
    

    Vermutlich ist noch anzumerken, dass worker im Workerthread liegen wird und die anderen Funktionen vom Hauptthread aus aufgerufen werden.



  • Was für eine Funktion hat die boost::lockfree::queue ?



  • Nach grobem Überfliegen habe ich den Eindruck, dass deine Job-ID mehr Probleme schafft als sie löst,
    da du jetzt zwei Datenstrukturen hast die synchronisiert werden müssen.

    Warum nicht direkt den Job* nehmen, um den Job aus einem anderen Thread heraus abzubrechen?

    Ohne Anspruch auf Vollständigkeit und Korrektheit könnte das z.B. so oder ähnlich aussehen:

    struct Job
    {
        Job(std::function<void(bool&)> func)
        : func{ func }, abort{ false }
        {
        }
    
        std::function<void(std::atomic<bool>& /* abortFlag */)> func;
        std::atomic<bool> abort;
    };
    
    class JobSystem
    {
        ...
    
        std::shared_ptr<Job> AddJob(std::function<void(std::atomic<bool>&)> func)
        {
            auto job = std::make_shared<Job>{ std::move(func) };
            jobQueue.push(new std::shared_ptr<Job>{ job });
            return job;
        }
    
        ...
    
        void worker()
        {
            while (true)
            {
                while (!jobQueue.pop(currentJob));
                (*currentJob)->func((*currentJob)->abort);
                delete currentJob;
            }
        }
    
        boost::lockfree::queue<std::shared_ptr<Job>*> jobQueue;
    }
    
    ...
    
    void jobFunction(std::atomic<bool>& abort)
    {
        while (have_work && !abort)
            do_some_work();
    }
    
    ...
    
    int main()
    {
        JobSystem js;
    
        auto job = js.addJob(&jobFunction);
    
        ...
    
        job->abort = true;
    
        ...
    }
    

    Und ja, ich weiss. Ein " new std::shared_ptr " sieht wirklich eklig aus, aber leider kommt boost::lockfree::queue nur mit Pointern zurecht
    und ich sehe da auf Anhieb keine sinnvolle Alternative, ohne eine weitere Datenstruktur zu verwenden, welche die Shared Pointer hält.

    Der Sinn des Shared Pointer ist hier übrigens das Job-Objekt so lange am Leben zu halten wie darauf Zugriffe stattfinden können:
    Entweder über die Job-Funktion, der ja eine Referenz auf den abort -Member des Jobs übergeben wird, oder aus dem Hauptthread heraus,
    wo der abort -Member des Jobs geschrieben werden könnte, obwohl der Job bereits abgearbeitet und entfernt wurde.

    Man beachte auch dass abort jetzt ein std::atomic ist, da ansonsten das Abbrechen eines Jobs aus einem anderen Thread ein data race und damit UB wäre.

    Das ist vielleicht nicht der beste Ansatz das Problem zu lösen, aber wie ich denke dennoch eine sinnvolle Weiterwentwicklung deines Codes.

    Gruss,
    Finnegan



  • Danke für deine Lösung!

    Zwei Fragen:

    1. Wenn der Workerthread die Abort-Veränderung zu spät sieht, dann läuft er eben noch eine Iteration weiter, bevor er abbricht. Das wäre doch nicht schlimm. Muss ich ernsthaft noch etwas anderes befürchten? (nicht hinterfragend, sondern verständnishalber; ich steige dann auf jeden Fall auf [c]atomic<bool>[/c] um)
      Danke, durch die Anmerkung habe ich nochmal etwas gesucht und hier ein paar wunderbare Erklärungen gefunden: http://stackoverflow.com/questions/16111663/do-i-have-to-use-atomicbool-for-exit-bool-variable

    2. Ich habe inzwischen eine neue Lösung ausgearbeitet. Nachteil ist, dass ich Jobs nicht vor Programmende zerstöre, aber das ist in meinem realen Anwendungsfall kein Problem. Dadurch erspare ich mir aber new und shared_ptr . atomic habe ich nach deiner Anmerkung hinzugefügt. Das sieht jetzt so aus:

    struct Job
    {
    	using JobFunction = std::function < void(const boost::atomic<bool>& /* cancel */) > ;
    
    	Job(int id, JobFunction&& func, bool abort) : id(id), func(std::move(func)), abort(abort) {}
    
    	// only for comparison
    	Job(int id) : id(id) {}
    
    	bool operator<(const Job& j) const {return id < j.id;}
    
    	Job(Job&& j) : id(j.id), func(std::move(j.func)), abort(j.abort.load()) {} // msvc13 does not like = default
    	Job& operator=(Job&& j) // msvc13 does not like = default
    	{
    		id = j.id;
    		func = std::move(j.func);
    		abort = j.abort.load();
    	}
    
    	Job(const Job&) = delete;
    	Job& operator=(const Job&) = delete;
    
    	int id;
    	JobFunction func;
    	boost::atomic<bool> abort; // does not change sort order
    };
    

    nebst

    class JobSystem
    {
    public:
    	static const int CAPACITY = 1000;
    
    	JobSystem() : thread(boost::bind(&JobSystem::worker, this)), jobQueue(CAPACITY), run(true)
    	{
    	}
    
    	~JobSystem()
    	{
    		run = false;
    		thread.join();
    		cout << "returned from join" << std::endl;
    	}
    
    	// not copyable, not assignable
    	JobSystem(const JobSystem&) = delete;
    	// not copyable, not assignable
    	JobSystem& operator=(const JobSystem&) = delete;
    
    	// add job and get job id
    	int AddJob(Job::JobFunction&& func)
    	{
    		Job j(newJobId, std::move(func), false);
    
    		jobs.push_back(std::move(j));
    
    		jobQueue.push(&jobs.back());
    
    		++newJobId;
    
    		return newJobId - 1;
    	}
    
    	// cancels job identified by id
    	void AbortJob(int id)
    	{
    		auto it = std::lower_bound(jobs.begin(), jobs.end(), Job(id), [](const Job& lhs, const Job& rhs){return lhs.id < rhs.id;});
    		if (it != jobs.end())
    		{
    
    			std::cout << "Job " << id << '/' << it->id << " aborted..." << std::endl;
    			it->abort = true;
    		}
    	}
    
    private:
    	static int newJobId;
    
    	// thread function, only one worker
    	void worker()
    	{
    		while (run)
    		{
    			const Job* job;
    
    			while (jobQueue.pop(job) && run)
    			{
    				if (!job->abort)
    					job->func(job->abort);
    			}
    		}
    
    		std::cout << "worked through" << std::endl;
    	}
    
    	boost::lockfree::spsc_queue<const Job*> jobQueue;
    	boost::container::stable_vector<Job> jobs;
    	boost::thread thread;
    	boost::atomic<bool> run;
    };
    
    int JobSystem::newJobId = 1;
    

    Kompiliert und funktioniert in meinen ersten Tests auch wunderbar.

    Ich synchronisiere den stable_vector hier natürlich nicht, aber da sehe ich auch keine race condition, da nicht direkt davon gelesen wird. Gibt es an der Lösung etwas auszusetzen? (außer dass der Speicher "zumüllt", aber das halte ich wie gesagt ggü. den sonst höheren Kosten durch shared_ptr und Konsorten für das geringere Übel, die Jobs sind ja eher klein).

    Edit: boost::atomic<bool> ist jetzt konsequent so eingesetzt, dass es kompilierbar ist.



  • Wenn Deine Jobs so lange laufen, dass Du darüber nachdenken musst, sie abbrechen zu wollen: Warum um alles in der Welt möchtest Du dann eine lock free queue verwenden?

    Ansonsten: Bevor Du irgend welche IDs generierst, mit denen Du dann das Flag zum Abbrechen ansprichst, nimm doch einfach einen shared_ptr auf einen bool und häng den an den Job und behalte eine Kopie des shared_ptr dort, wo Du abbrechen möchtest.



  • Es kann sehr lange dauern, es kann aber auch sehr schnell gehen und es können auch viele kleine Jobs nacheinander laufen. Die Jobs sind sehr heterogen. Abbruch vieler Jobs und Neustart der nächsten muss auf alle Fälle maximal flüssig funktionieren, da all das in einer Realtime-Anwendung laufen soll.

    Was ist der Nachteil der IDs? shared_ptr wirken für mich gerade wie der größere Overhead. Und falls ja, wäre thread-safety überhaupt schon geregelt? Bräuchte ich nicht eher einen shared_ptr<atomic<bool>> ?



  • Naja, die Reservierung der IDs müsstest Du ja auch über alle threads synchronisieren um dann damit letztendlich wieder "nur" auf den passenden bool (atomic<bool>) zu kommen.



  • AddJob und AbortJob sind nicht thread-safe.
    Und dass die Jobs nie gelöscht werden würde mich persönlich schon Sorgen machen.
    Bzw. wenn du so wenige Jobs/Sekunde brauchst, dass es Wurst ist, dann sollte auch der Overhead einer Mutex Wurst sein.



  • hustbaer:
    Wieso sind sie das nicht, könntest du das ausführen? Es gibt nur einen Thread, der Jobs hinzufügt, was vermutlich eine notwendige Ergänzung ist.

    Torsten:
    Wo müsste ich denn die Reservierung der IDs denn synchronisieren, was ich nicht schon habe, wenn nur ein Thread Jobs posten kann?



  • Eisflamme schrieb:

    Wo müsste ich denn die Reservierung der IDs denn synchronisieren, was ich nicht schon habe, wenn nur ein Thread Jobs posten kann?

    Wenn Du wirklich nur einen thread hast, der Jobs erzeugt und nur einen hast, der Jobs konsumiert und wirklich Jobs nicht löschst, dann brauchst Du natürlich keine Synchronisation.

    Aber: dann brauchst Du auch keine IDs. Die ID ergibt sich dann ganz einfach als `jobs.size()`. Dann brauchst Du aber auch keine lineare Suche über eine kontinuierlich wachsende Liste (dann reicht auch `jobs[id]`). Einfach eine Referenz (pointer) auf den Job (oder eben auf das Flag) zu behalten würde auch reichen. Und wenn das tatsächlich so funktioniert, dann brauchst Du wahrscheinlich auch überhaupt keine 2 threads.



  • Es können aber doch mehrere Jobs gequeued sein, wer sagt denn, dass ich immer nur den letzten abbrechen möchte? Ich finde das aus Nutzersicht komfortabel. Aber stimmt schon, ein Zeiger auf den Job ist noch besser!

    Dass ich keine zwei Threads brauche, verstehe ich nicht. Ich brauche ja dennoch eine Jobqueue, die den Hauptthread nicht blockiert. Und SPSC ist doch häufig in Verwendung, da gibt es doch stets natürlich nur einen Producer und Consumer (wie der Name ja sagt) und das ist trotzdem im Threadumfeld aktiv. Klingt für mich etwas so, als würdest du dem die Berechtigung absprechen. Korrigiere mich bitte.



  • Eisflamme schrieb:

    hustbaer:
    Wieso sind sie das nicht, könntest du das ausführen? Es gibt nur einen Thread, der Jobs hinzufügt, was vermutlich eine notwendige Ergänzung ist.

    Ja, ich meinte wenn es mehrere Producer gibt. Dass es nur einen gibt war ja nicht klar (du hattest nur geschrieben dass es nur einen Worker gibt).

    Bzw. nicht threads-safe bleiben sie trotzdem, nur so lange nur ein Thread in die Klasse rein-ruft, müssen sie es natürlich auch nicht sein.



  • Tut mir Leid, da hast du recht. Das einzige Indiz war, dass ich die spsc_queue von boost verwende, im Text hatte ich es ganz vergessen. 😞

    Fällt denn sonst ein Leak, ein Race o.ä. auf für mein SPSC-Setting? Bei multithreading checke ich lieber doppelt und dreifach.



  • Eisflamme schrieb:

    Fällt denn sonst ein Leak, ein Race o.ä. auf für mein SPSC-Setting? Bei multithreading checke ich lieber doppelt und dreifach.

    Ja natürlich. Du löschst nie Elemente aus `jobs`. Du must mal Deinen ungewöhnlichen use-case beschreiben. Auf der einen Seite muss es eine lock-free queue sein, weil ganz viele kleine Jobs durch die Queue gehen. Auf der anderen Seite können die Jobs aber auch sehr lang sein, so dass sie abbrechbar sein müssen. Dass ganze läuft aber nur kurz, sodass das Gesamtmenge der Jobs nicht groß wird.

    Ich bin gespannt 😉



  • Da sehe ich weder Leak noch Race, ein Job ist ja winzig.

    Ist halt eine User-Application und der User ändert häufig Parameter. Bei einer Parameteränderung soll automatisch eine Berechnung angestoßen werden (und alte abgebrochen), sodass man stets die aktuellen Ergebniszahlen oder ein Ladesymbol sieht. Je nach Parametern kann eine Berechnung in 100-200 ms fertig sein (das könnte für lockfree natürlich immer noch kein Anreiz sein, da lasse ich mich gerne aufklären).

    Oder der User ist mit seinen Parametern eben zufrieden, dann wartet er vielleicht auch bei komplexeren Settings eine längere Zeit auf die Ergebnisse.

    Lockfree muss es prinzipiell nicht sein. Ich wollte nur die Performance optimieren. Die schlechte Lösung, die ich einmal hatte, war, dass für jeden Job ein neuer Thread gespawned wurde und da hat man bei mehreren Abbrüchen dann schon deutlich gemerkt, dass die gesamte Applikation sehr, sehr langsam wurde. Aber mit einem Workerthread oder Threadpool, der über Mutex und CV synchronisiert ist, wäre das wohl auch schnell.

    Jedenfalls wollte ich die lockfree-Variante als eine Möglichkeit haben und damit gegen eine Variante mit Mutexes benchmarken - nur ist letztere Variante eben leichter zu implementieren, sodass das hier nicht Gegenstand des Threads ist.

    Während der Zeit, die ein Benutzer das Programm nutzt, werde ich jedenfalls niemals so viele Jobs anhäufen, dass es Speicherprobleme gäbe, da ein Job selbst ja nur wenig Speicher benötigt. (Mittlerweile stört mich das jedoch auch, sodass ich hier ohnehin umstrukturieren werde, aber dennoch interessiert mich, ob das hier schon mal stabil laufen würde - mit meinen Annahmen)



  • Wenn ein Job immer den evtl. vorherigen abbricht, dann ist das Interface doch aber ein ganz anderes. Dann hast Du eigentlich noch nicht mal eine Queue. Ein Mutex, eine Condition Variables, ein thread, zwei Jobs und ein atomic< bool > sollten dann reichen, um das zu implementieren.



  • Ja, stimmt, nur habe ich verschiedene Arten von Jobs und davon können schon mehrere in der Queue sein, nur bricht jeder Jobtyp den vorherigen ab. Und wie gesagt, ich bin an einer möglichst lockfree Lösung interessiert (mit CV, Mutex ist es mMn ja eher einfach).



  • Das würde dann ggf. für mehrere "Queues" sprechen.

    Warum um alles in der Welt möchtest Du eine lock free-Lösung? Du hast doch überhaupt keine contention auf der queue. Lock-free macht nur dann Sinn, wenn Du ganz viele konkurrierende Zugriffe auf etwas hast oder eine queue hast, die nie leer sein wird. In Deinem Fall verbrennt der Workerthread sinnfrei CPU beim Warten auf der queue. Dass mag die Latenz um einige nano-Sekunden verringern, nervt den Anwender aber, wenn andauernd der Lüfter des Rechners läuft.



  • Na ja, nach der ganzen (sehr aufschluss- und lehrreichen, übrigens, danke schon mal :)) Diskussion hier beschränkt sich die Frage nach einer solchen lockfree Lösung fast nur noch auf persönliches Interesse. Wie gesagt würde ich gerne benchmarken und dadurch sehen, was besser passt. Mittlerweile erledigt sich die Frage jedoch vermutlich fast von selbst.

    Wieso empfiehlst du denn mehrere Queues für mehrere Jobtypen, wenn du eben noch von zwei Threads (ganz ohne Queue) für einen Jobtyp sprachst? Oder meinst du das damit?



  • Ich schrieb "Queue" in Anführungszeichen, damit ich für dieses Konstrukt, dass eben keine Queue ist keinen Namen finden muss (Queue != "Queue").

    Wenn Du relativ wenig dieser Jobtypen hast, und diese unabhängig von einander laufen können, dann könntest Du eine relativ einfache "Queue" (oder job_executer oder single_job_queue) implementieren und diese dann halt für jeden Typen instanziieren.


Anmelden zum Antworten