Producer-Consumer-Problem
-
fabske schrieb:
Ehrlich gesagt hab ich nicht ganz verstanden was der Unterschied sein soll, ob man nun explizit lock aufruft oder so ein Objekt erstellt. Ich hab es nun mal geändert. Auch hab ich nun ein cond eingebaut.
Das release wird automatisch vom dtor aufgerufen (RAII).
-
fabske schrieb:
Kann mir jemand sagen ob ich mir da was falsch gedacht hab oder ob ich was falsch umgesetzt hab?
Du benötigst keinen selbstgeschriebenen Puffer, ein Queue oder eine Liste tut es auch.
Der Producer produziert fleißig Arbeit für seine Consumer, die möchte er den Consumern zugänglich machen und legt sie in einem Objekt (Puffer) ab welches von vielen verschiedenen Threads gesehen wird und deshalb der Zugriff synchronisiert erfolgen muss. Wenn der Producer also etwas in den Puffer, z.B. einen Queue oder eine Liste schreiben möchte holt er sich das Lock, schreibt hinein und benachrichtigt irgendeinen vermeintlich wartenden Consumer. Das locken sollte über das Mutex funktioneren und das benachrichten müsste über diese Condition mit notify_one() gehen.
Ein Consumer möchte sich ein Stückchen Arbeit abholen dazu muss er den Zugriff auf den Puffer locken, dann schaut er ob überhaupt was da ist und falls nichts da ist, gibt er das Lock frei und wartet auf ein Event (das notify) welches ihn fortfahren lässt. Das sollte mit .wait() auf der Condition funktionieren. Wenn er nicht warten muss, tut er seine Arbeit und verschwindet dann. Was auch immer "verschwinden" für dich bedeutet. (Wiederverwenden, Beenden etc.)
-
Vielen Dank TheTester für deine Erklärung! Ich denke ich habe das bereits verstanden und auch schon umgesetzt (siehe Kode).
Vielleicht hätte ich es der Deutlichkeit halber hinzuschreiben sollen. Bei meinem Problem soll der Produzent Objekte produzieren die an ALLE Konsumenten gehen. Also nicht einer kümmert sich darum, sondern jeder Konsument muss jedes Objekt vom Produzent erhalten. Deshalb hab ich mir einen Puffer geschrieben. Allerdings hab ich da Probleme mit dem thread_specific_ptr, bei dem ich hoffe dass mir jemand weiterhelfen kann?
-
Ich hab das Problem mit dem thread_specific_ptr nun mal auskommentiert bis mir jemand das beantworten kan. Hab gerade die cond getestet und gemerkt dass es nicht will. Der Puffer hat:
class Puffer { public: bool einfuegen(int*); int* entnehmen(list<int*>::iterator); int* entnehmen_wait(list<int*>::iterator); bool entsagen(list<int*>::iterator); bool print(); list<int*>::iterator anfang(void); boost::condition_variable_any NeueDaten; boost::thread_specific_ptr< list<int*>::iterator > Aktuell; private: list<int*> NeuRaus; list<int*> AltRein; boost::shared_mutex NeuRausMutex; boost::shared_mutex AltReinMutex; };
Ich hab eine zweite Funktion zum entnehmen erstellt, welche warten soll bis ein Element im Puffer ist:
int* Puffer::entnehmen_wait(list<int*>::iterator I) { boost::shared_lock<boost::shared_mutex> lock1(NeuRausMutex); while( I==NeuRaus.end() ) NeueDaten.wait(lock1); return *I; };
Was ich nicht verstehe ist warum man dem wait den lock mitübergeben muss!? Anscheinend damit es den lock aufhebt? Aber das könnte er doch generell machen wenn ich wait aufrufe. Oder gibt es noch einen anderen Grund?
Die Funktion zum Einfügen (mit recycling) ruft dann notify auf:bool Puffer::einfuegen(int* E) { boost::unique_lock<boost::shared_mutex> lock1(AltReinMutex); if( AltRein.empty() ) { boost::unique_lock<boost::shared_mutex> lock2(NeuRausMutex); NeuRaus.push_back(E); NeueDaten.notify_all(); } else { boost::unique_lock<boost::shared_mutex> lock2(NeuRausMutex); AltRein.front()=E; NeuRaus.push_back(AltRein.front()); AltRein.pop_front(); NeueDaten.notify_all(); } return true; };
Der ganze Kode ist hier: http://nopaste.debianforum.de/32703
Kompiliert und läuft, aber wartet eben nicht auf das notify
Leider finde ich keine gute Doku zu boost, immer nur Stückchen. Ich würde es gerne nachlesen aber es gibt nix zum Lesen
In dem Howto das ich benutze wird ein boost::mutex in diesem cond-Beispiel verwendet. Ich verwende aber wegen Lese und Schreibzugriff ein boost::shared_mutex. Ist das das Problem und wenn ja warum?Falls jemand mit sagen kann ob das mit dem thread_specific_ptr ein Denkfehler ist wäre ich auch sehr sehr dankbar! Brauche dingend Hilfe!
Vielen Dank, Fabian
-
Bei meinem Problem soll der Produzent Objekte produzieren die an ALLE Konsumenten gehen. Also nicht einer kümmert sich darum, sondern jeder Konsument muss jedes Objekt vom Produzent erhalten.
Dann sollte auch jeder Consumer auch seine eigene Queue haben, in der Auftragskopien, also die Objekte, abgelegt werden. Der Producer legt dann in 5 Faecher das Gleiche ab und nicht nur in eins.
-
Deine "entnehmen_wait" Funktion ist vollkommen falsch. Was soll der Iterator sein den du da übergibst?
(BTW: "netter" Mix aus Deutsch und Englisch. *würg*)Die Funktion muss eher so aussehen:
int* Puffer::entnehmen_wait() { boost::shared_lock<boost::shared_mutex> lock1(NeuRausMutex); while( NeuRaus.empty() ) NeueDaten.wait(lock1); return *NeueDaten.pop_front(); };
Und zu deiner Frage: den Lock musst du mitgeben, weil die Condition-Variable ja nicht weiss, welche Mutex verwendet wird, und daher freigegeben werden muss.
Und shared_mutex ist die "falsche" Klasse. Sollte zwar funktionieren, ist aber unnötig langsam. shared_mutex benötigst du, wenn du Zugriffe zwischen verschiedenen Prozessen synchronisieren willst.
-
Anscheinend versteht mich niemand
Ich habe einen Erzeuger der Elemente produziert die an ALLE Verbraucher geliefert werden. knivil hat natürlich recht, man kann auch jedem Verbraucher einen solchen Puffer verpassen und der Erzeuger liefert die Elemente dort ab. Bei vielen Verbrauchern hab ich dann aber das Problem, dass mein Erzeuger großteils mit dem verteilen von neuen Elementen beschäftigt ist anstatt mit dem produzieren. Oder ich mache einen Auslieferer-Thread der dann aber auch wieder einen Puffer braucht.
Aus diesem Grund hab ich mich entschieden beim Erzeuger einen intelligenten Puffer anzubringen, wo sich die Verbraucher die Elemente abholen. So hab ich nur einen Puffer und jeder hat ungefähr gleichviel zu tun (Erzeuger einmal einliefern, jeder Verbraucher einmal abholen).
Die Elemente sollen an JEDEN Verbraucher gehen, deshalb kann ich nicht einfach ein pop() machen! Sonst ist das Element weg und die anderen Verbraucher haben es nicht bekommen. Außerdem muss sich JEDER Konsument merken wo er sich im Puffer befindet, also welches Element er schon abgeholt und welches nicht. Deshalb gibt es zwei Funktionen:
entnehmen(Iterator) - entnimmt nicht wirklich was aus dem Puffer sondern liefert nur das FÜR DIESEN VERBRAUCHER neueste Element. Für andere Verbraucher ist ein anderes Element das momentan neueste.
entsagen(Iterator) - zählt am Element eine membervariable runter. Sobald diese auf 0 geht wieder das Element recyclt.
Verbraucher und Erzeuger werden natürlich von verschiedenen Threads betrieben. Aus diesem Grund brauche ich einen shared_mutex. Leider will das wait aber einfach nicht funktionieren.while( NeuRaus.empty() ) NeueDaten.wait(lock1);
und
while( I==NeuRaus.end() ) NeueDaten.wait(lock1);
ist genau das selbe. Wenn der übergebene Iterator auf das Ende der Liste zeigt ist sie demnach leer.
Kann mir irgendjemand sagen warum das wait() und notify() nicht funktionieren will? http://nopaste.debianforum.de/32703
-
fabske schrieb:
while( NeuRaus.empty() ) NeueDaten.wait(lock1);
und
while( I==NeuRaus.end() ) NeueDaten.wait(lock1);
ist genau das selbe. Wenn der übergebene Iterator auf das Ende der Liste zeigt ist sie demnach leer.
NEIN das ist NICHT das selbe!
Du hast da offensichtlich die Iteratoren noch nicht ganz verstanden.
Mit rohen Zeigern ginge das was du da machen willst durchaus (mit einigen Einschränkungen).
Mit std::list wird es aber NICHT funktionieren.p.S.: das mit shared_mutex vs. mutex war mein Fehler. Ich würde aber tendentiell auch keine shared_mutex hier verwenden -- oft ist der Overhead eines ReaderWriterLocks grösser, als das was man sich dadurch erspart. Plus mit einem ReaderWriterLock kann es schnell zu Writer-Starvation kommen (bzw. bei manchen Implementierungen zu Reader-Starvation).
-
hustbaer du hast Recht! Und ich auch
Wenn "mein" Iterator auf das Ende der Liste zeigt muss sie nicht leer sein, ganz klar, da hast du Recht. Aber die Abfrage ob die Liste leer ist bringt mir gar nix. Für einen Verbraucher kann sie leer sein, für den anderen nicht! Weil ich hab mehrere Verbraucher hab! Jeder hat seinen eigenen Iterator auf einen Punkt in der Liste und wandert damit weiter. Kommt er ans Ende ist FÜR IHN die Liste leer. So sieht es aus.
Wie ich nun debugged hab hatte ich einige Fehler drin (z.B. eben diesen Iterator vergessen hochzuzählen). Ob das wait nun geht oder nicht kann ich grad nicht sagen, weil ich momentan Probleme mit threads hab. Sobald die gelöst sind werde ich es nochmal testen.
-
Jeder hat seinen eigenen Iterator auf einen Punkt in der Liste und wandert damit weiter. Kommt er ans Ende ist FÜR IHN die Liste leer. So sieht es aus.
was du nicht zu verstehen scheinst, ist folgendes:
#include <list> #include <cassert> int main(int argc, char** argv) { std::list<int> list; list.push_back(1); std::list<int>::iterator it = list.begin(); // it zeigt auf den einser assert(it != list.end()); it++; // it zeigt auf end assert(it == list.end()); list.push_back(2); // it zeigt immer noch auf end assert(it == list.end()); // WICHTIG: dieses verhalten ist "üblich", aber AFAIK nicht im Standard definiert, // also auch nichts, worauf man sich verlassen dürfte return 0; }
-
Das ist das Problem hustbaer
Das ist diese "Abnormalität" die ich zu begreifen versucht hab. Solange der Puffer gefüllt ist und ich einen Iterator auf ein Element hab funktioniert alles gut. Sobald ich aber an die Grenzen der Liste komme oder sie gar leer wird, verhält sich mein Iterator nicht mehr wie gewünscht.
Was ich brauche ist ein Puffer bei dem der Iterator am Ende ankommen kann. Wenn man dann ein Element hinten einfügt soll diese nicht mehr auf das Ende sondern auf das eben hinten eingefügte (das letzte) Element zeigen. Kann ich das verhalten irgendwie beeinflussen? Gibt es einen anderen Container der das Verhalten an den Tag legt? Wohl nicht, weil es ja alles Iteratoren sind?
-
fabske schrieb:
Das ist diese "Abnormalität" die ich zu begreifen versucht hab.
Deine Vorstellung von Iteratoren ist "abnormal", nicht das beobachtete verhalten
Solange der Puffer gefüllt ist und ich einen Iterator auf ein Element hab funktioniert alles gut. Sobald ich aber an die Grenzen der Liste komme oder sie gar leer wird, verhält sich mein Iterator nicht mehr wie gewünscht.
Was ich brauche ist ein Puffer bei dem der Iterator am Ende ankommen kann. Wenn man dann ein Element hinten einfügt soll diese nicht mehr auf das Ende sondern auf das eben hinten eingefügte (das letzte) Element zeigen. Kann ich das verhalten irgendwie beeinflussen?
Nö, nicht wirklich. Ausser du schreibst deinen eigenen Container, der soetwas kann.
Ich muss auch sagen dass ich es ziemlich grässlich finde, hier mit rohen Iteratoren zu arbeiten. Anders gesagt: ich wäre garnie auf die Idee gekommen, das so machen zu wollen. Damit legst du die Innereien der Klasse bloss, und kannst die Implementierung nichtmehr sauber ändern.Gibt es einen anderen Container der das Verhalten an den Tag legt? Wohl nicht, weil es ja alles Iteratoren sind?
Ich kenne keinen. Und siehe oben: ich würde es sowieso nie so machen.
Versuch mal das ganze etwas mehr zu abstrahieren.
Du hast einen "Producer", und mehrere "Consumer". Die "Consumer" müssen sich ja wohl irgendwie beim "Producer" an- und abmelden. Sonst weiss der ja nie was er noch für irgendeinen "Consumer" aufheben soll.
Sogesehen stellt so eine Anmeldung eine Resource dar (vergleichbar mit z.B. einer Socket Verbindung zu einem Server der irgendwelche Date streamt oder was auch immer).
Diese Resource solltest du IMO explizit machen, und der Einfachheit halber gleich mit RAII behandeln.
Das Interface könnte inetwa so aussehen:template <class T> class Queue : private boost::noncopyable { public: class Subscription : private boost::noncopyable { public: bool HasCurrent() const; // true wenn die Subscription ein aktuelles Element "hat" void ReleaseCurrent() const; // gibt das aktuelle Element an den Queue zurück ("entsagen") T const& GetCurrent() const; // Zugriff auf das aktuelle Element - wirft std::logic_error() wenn kein aktuelles Element vorhanden void Advance(); // gibt das aktuelle Element zurück, und holt sich das nächste (wartet bis eins verfügbar ist) bool TryAdvance(); // falls ein weiteres Element sofort vergübar ist, aktuelles zurückgeben, neues holen, return = true. // falls kein weiteres Element sofort verfügbar ist, return = false, sonst keine Änderung (aktuelles Element wird nicht zurückgegeben) // evtl. noch: bool TimedAdvance(TimeoutType timeout); // dasselbe wie TryAdvance, nur dass bis zu "timeout" gewartet wird, bevor "false" zurückgegeben wird // eine Subscription ist initial erstmal "leer", d.h. der Client muss erstmal Advance() sagen, damit es ein aktuelles Element gibt // ... private: friend Queue<T>; Subscription(Queue<T> const& owner); // ... }; boost::shared_ptr<Subscription> Subscribe(); // "unsubscribe" ist implizit über das Löschen der Subscription, was wiederum automatisch von boost::shared_ptr erledigt wird, // sobald der letzte boost::shared_ptr auf die Subscription verschwindet void PushElement(T const& element); private: // ... };
Damit hast du das Ganze erstmal schön gekapselt, und die Queue ("Puffer") Klasse läuft jetzt nichtmehr mit aufgeschlitztem Bauch und raushängenden Eingeweiden rum.
Die Implementierung musst du natürlich immer noch irgendwie machen.
Dazu gibt es mehrere Möglichkeiten.
Spontan fällt mir mal folgendes ein:
Du verpasst der "Queue" einen Zähler, mit der du alle reingesteckten Elemente durchnummerierst. Die Subscriber merken sich anstelle des Iterators auf das nächste Element nur die Nummer des nächsten Elements.
Die Queue speichert alle Elemente die noch gebraucht werden in z.B. einer std::deque (schnelles einfügen und entfernen vorne und hinten, schneller zugriff auf das N.te Element).
Damit die Queue auch weiss welche Nummer jedes Element in der std::deque hat, merkt sie sich zusätzlich noch die Nummer des ersten Elements.
Zusätzlich merkt sich die Queue zu jedem Element in der std::deque noch, wie viele Subscriber es gibt, die dieses Element noch nicht zurückgegeben haben, damit die Elemente auch irgendwann wieder entfernt werden können. (Dazu verwendet sie eine kleine Hilfsstruktur, die einen Zähler und das eigentliche Element beinhaltet. Die std::deque ist also keine std::deque<T>, sondern eine std::deque<Hilfsstruktur>).
Damit es bei Überläufen des Zählers keine Probleme gibt, musst du natürlich überall aufpassen, wo du mit Zählerwerten rumhantierst.
Alternativ könntest du auch als Zähler einen 64 Bit Integer verwenden, und einfach beschliessen, dass das Programm nie so lange laufen wird, dass der Zähler jemals überläuft. Rechne es dir durch, ich glaube diese Annahme ist bei heutiger Hardware einigermassen sicher.
(Bei 4GHz und einem neuen Element pro Take wären das 4 Milliarden Sekunden, und 4 Milliarden Sekunden sind lange)
-
Erstmal vielen Dank hustbear für den langen und ausführlichen Beitrag! In der Tat gibt es zwei Möglichkeiten: Entweder die Verbraucher melden sich beim Erzeuger an und er verwaltet die Elemente, oder der Erzeuger weiß nichts von der Anzahl der Verbraucher.
Ich hab mich für letztes entschieden, weil ich zusätzlich drauf achten muss dass das ganze sehr sehr schnell funktioniert. Ich will den Prozessor so wenig wie möglich belasten und ich meine zu wissen dass RAII und andere tolle Konzepte doch etwas overhead mitbringen.
Die Objekte die in den Puffer wandern haben bereits eine Variable "int Benutzung" in der initial (bei Erstellung) die Anzahl der Verbraucher eingetragen wird. Wird ein solches Objekt aus dem Puffer entnommen ist der Verbraucher in der Verantwortung dies zu dekrementieren. Der letzte räumt es dann auf. So muss der Erzeuger nichts von den Verbrauchern wissen und ich spare mir eine Verwaltungsstruktur am Erzeuger. Die Idee die Elemente durchzunummerieren bzw. eine ID zu verwenden hatte ich auch schon. Damit würde ich mir das hässliche übergeben des Iterators ersparen, das stimmt. Und wenn ich merke dass mir das zu viel Probleme macht werde ich mir auch eine solche Lösung wohl überlegen müssen! Das heißt, dass der Puffer (bzw. der Erzeuger) sich merkt wer was schon abgeholt hat.An dieser Stelle kommen wir wieder auf den thread_specific_ptr! Wäre das nicht die ideale Lösung? Wenn ich im Puffer einen thread_specific_ptr als Iterator habe, der (weil thread_specific_ptr) für jeden Verbraucher individuell ist, immer auf das aktuelle Element zeigt? Damit müsste ich keine Iteratoren vom Verbraucher an den Erzeuger mehr übergeben und hätte implizit schon die Verwaltungsstruktur?
Auf deinen Beitrag hin ab ich mich von std::list verabschieden und auf std::deque gewechselt. Wie ein Test ergab habe ich damit keine Probleme mehr beim Übergeben der Iteratoren!
#include <deque> #include <iostream> std::deque<int> data; int gibElement(std::deque<int>::iterator it) { if( it==data.end() ) return -1; // Ende! else return *it; }; int main(int argc, char** argv) { std::deque<int>::iterator it = data.begin(); data.push_back(1); data.push_back(2); std::cout << gibElement(it) << std::endl; it++; std::cout << gibElement(it) << std::endl; it++; data.pop_front(); data.pop_front(); data.push_back(3); data.push_back(4); std::cout << gibElement(it) << std::endl; it++; std::cout << gibElement(it) << std::endl; it++; std::cout << gibElement(it) << std::endl; return 0; }
Wenn ich jetzt noch den Iterator als thread_specific_ptr (in einer Thread-Umgebung) speichern könnte wäre es glaube ich perfekt!
-
RAII hat overhead? wo?
naja, egal.
ich gehe davon aus, dass eine saubere implementierung potentiell gleich schnell ist, wie eine weniger saubere lösung.
(EDIT: in diesem speziellen fall meine ich. es gibt natürlich andere fälle, wo man viel geschwindigkeit rausholen kann, wenn man auf "schön"/"sauber" verzichtet.)was die thread-specific-pointer angeht: wenn du unbedingt willst, dann mach es so. ich würde in diesem fall ein explizites objekt ("Subscription") definitiv vorziehen. ist sauberer. und du sparst den - zugegebenermassen nicht sehr grossen - overhead den thread-specific-pointer haben.
-
Eventuell schwenk ich doch noch um hustbaer, deshalb sind deine Vorschläge nicht umsonst! Das ist mein erstes richtiges Softwareprojekt und ich sammle gerade Erfahrungen.
Du meintest weiter oben dass du gar keine shared_mutexe verwenden würdest. Bisher habe ich das ja für meine deques:
std::deque<Daten*> NeuRaus; boost::shared_mutex NeuRausMutex;
Je nachdem ob ich einfüge oder nur lese mach ich dann ein
boost::unique_lock<boost::shared_mutex> lock(NeuRausMutex);
oder ein
boost::shared_lock<boost::shared_mutex> lock(NeuRausMutex);
Soll ich das deiner Meinung nach einfach immer dadurch ersetzen?
boost::mutex NeuRausMutex; boost::lock_guard<boost::mutex> lock(NeuRausMutex);
-
Ja, genau.
Du kannst aber auch einfach 3 Typedefs machen:#if 1 // use shared_mutex or not typedef boost::shared_mutex MutexType; typedef boost::shared_lock<MutexType> SharedLock; typedef boost::unique_lock<MutexType> UniqueLock; #else typedef boost::mutex MutexType; typedef boost::unique_lock<MutexType> SharedLock; // unique_lock<> kann man auch mit "nicht-shared" Mutexen verwenden typedef boost::unique_lock<MutexType> UniqueLock; #endif
Wenn du im Code dann überall nurmehr die Typedefs verwendest, kannst du alles nur über die Typedefs von "shared_mutex" auf "mutex" umschalten, und wieder zurück.
z.B. wenn du Performence-Vergleiche machen willst.
-
Wieso sollte RAII einen Overhead haben?
Ob ich eine Release Funktion, delete oder sonstwas "per hand" aufrufe, oder der Destruktor das macht, kommt doch aufs selbe heraus.Zum shared-lock: Der macht nur Sinn, wenn man mehrere lesende Threads hat und der Lesezugriff auch ne Weile dauert. Bei queue.front(); queue.pop(); wird sich das kaum lohnen. Außerdem hast Du nur einen Consumer, wenn ich das beim Überfliegen richtig verstanden habe.
-
Nönö, er hat schon mehrere Consumer.
Bin mir aber fast sicher, dass es trotzdem Overkill ist.
Eben weil die Locks nur so kurz gehalten werden.
-
hustbaer ich hab es eingesehen. Die Idee mit den iteratoren war keine gute. Ich weiß nicht warum aber mit der Zeigerlösung schaffe ich es immer nur 319 Objekte vom Erzeuger zum Verbraucher zu schauffeln, danach kommt ein Speicherzugriffsfehler. Liegt wohl an den Erklärungen dieses Threads: http://www.c-plusplus.net/forum/viewtopic-var-t-is-256167-and-highlight-is-.html
Ich habe nun umgebaut auf IDs. Die hatte ich schon vorher. Es handelt sich um einen timestamp der Erstellungszeit meiner Daten die ich nun als ID benutze. Anstatt die iteratoren übergebe ich nun IDs und lass sie in den deques suchen. Puffer hab ich auch abgeschafft und direkt in Modul.cpp integriert. Falls es dich interessiert: http://codepad.org/lBGbcbNS