Producer-Consumer-Problem
-
Ich verwende die boost_thread um zwei Threads zu erzeugen die über einen Puffer miteinander Objekte austauschen sollen (in eine Richtung). Also das klassische Problem bei dem der Producer etwas erzeugt und es dem Consumer gibt. Bei all den Beispielen im Internet sieht das ganz einfach aus. Beide warten am Ende ein kurze Zeitspanne, in der der jeweils andere auf den Puffer zugreift und ihn dann sperrt. Die Möglichkeit dass der Puffer auch mal für längere Zeit leer sein könnte bleibt dabei außen vor.
Ich möchte dass die beiden so schnell wie möglich arbeiten, eine Zeitspanne zu warten finde ich dabei unschön. Außerdem kann der Producer bei mir auch mal ne längere Zeit einfach nix produzieren und der Puffer steht leer. Der Consumer würde jetzt oft seine Schleife durchrattern und immer schauen ob was da ist. Ist das nicht unnötige Arbeit? Kann man dem Consumer nicht ein Signal zukommen lassen dass nun etwas da ist, oder noch besser das Rausnehmen eines Elementes aus dem Puffer blockierend machen? Sodass eine Entnahme aus einem leeren Puffer einfach solange steht bis etwas hineingefüllt wurde? Wobei das wahrscheinlich nicht geht weil man solange nichts hineinfügen könnte, ja?
Ich habe mir überlegt einen zweiten lock anzulegen, der nur angibt dass der Puffer leer ist. Der Consumer könnte den lock sperren und warten bis der Producer ihn entsperrt. Aber geht das überhaupt?
-
Der Consumer würde jetzt oft seine Schleife durchrattern und immer schauen ob was da ist.
Nein.
Kann man dem Consumer nicht ein Signal zukommen lassen
Ja, mittels Semaphore oder Mutex.
Wobei das wahrscheinlich nicht geht weil man solange nichts hineinfügen könnte, ja?
Doch geht.
Ich habe mir überlegt einen zweiten lock anzulegen, der nur angibt dass der Puffer leer ist. Der Consumer könnte den lock sperren und warten bis der Producer ihn entsperrt. Aber geht das überhaupt?
Am besten entnimmst du die Implementation dieses Standardbeispiels der Literatur wie hier: http://en.wikipedia.org/wiki/Producer-consumer_problem , um Fehler zu vermeiden. Auch ist ein Link direkt zu einer Implenetation mit boost:thread angegeben.
-
Kannst du mir das erklären? Oder besser noch ein Beispiel linken wie es funktioniert?
Es gibt einen Lock und entweder der Producer oder der Consumer sperrt diesen, solange sie entnehmen oder füllen. Wenn der Producer nun nichts zum reinlegen hat und der Puffer leer ist, dann versucht doch der Consumer ständig etwas zu entnehmen, in dem er den lock sperrt, nachschaut ob was da ist, wieder entsperrt und das ganze wieder von vorne anfängt.
-
Genau dieses Beispiel meinte ich eigentlich! Hatte ich schon zuvorgefunden:
http://acumensoftwareinc.com/TechNotes/ProducerConsumer/html/ProducerConsumer.cppIch sehe hier keine Semaphore sondern nur einen Mutex und ein sleep am Ende von Consumer und Producer.
-
Schau dir mal "Condition" an. Damit solltest du die Consumer-Threads auf ein Mutex in einen "Wartepool" ablegen können wenn die "Bedingung" nicht erfüllt ist. Dein Producer-Thread kann dann an der "Condition" notifyXXX() aufrufen um einen oder alle wartenden Threads weiterlaufen zu lassen.
-
Also knivil ich hab das jetzt einfach mal ausprobiert und leider hat sich deine Behauptung als falsch herausgestellt. Ich hab das Beispiel angepasst und die Wartezeiten ungleichmäßig eingestellt. Wie sich zeigt rattert die Schleife durch: http://nopaste.debianforum.de/32493
http://nopaste.debianforum.de/32494
-
fabske schrieb:
http://nopaste.debianforum.de/32493
http://nopaste.debianforum.de/32494Da sind so einige Macken drin.
Globale Objekte sollten vermieden werden. Ich sehe einen unsynchronisierten Zugriff auf Buffer.size(). Du benutzt lock/unlock des Mutexes manuell für die Textausgabe. Warum kein Lock-Objekt stattdessen? Was soll das ganze
sleep
da? Du wartest im Consumer ja darauf, dass wieder Elemente im Buffer stehen. Der Producer wartet darauf, dass es wieder freie Plätze im Buffer gibt. Das lässt sich wunderbar über Bedingungsvariablen,wait
undnotify_xxx
regeln.Gruß,
SP
-
Also knivil ich hab das jetzt einfach mal ausprobiert und leider hat sich deine Behauptung als falsch herausgestellt.
Deine Implementation und die Beispiel Implementation unterscheiden sich stark. Auch habe ich keine Behauptung aufgestellt. Ich selbst habe es schon als Uebungsaufgabe korrekt implementiert. Wildes Ausprobieren wird dir in diesem Fall nicht weiterhelfen. Eine fehlerfreie Implementation ist nur moeglich, wenn du es auch verstanden hast.
-
Erstmal Danke euch beiden! Ich arbeite mit dieser Doku: http://www.highscore.de/cpp/boost/multithreading.html
Du benutzt lock/unlock des Mutexes manuell für die Textausgabe. Warum kein Lock-Objekt stattdessen?
Ehrlich gesagt hab ich nicht ganz verstanden was der Unterschied sein soll, ob man nun explizit lock aufruft oder so ein Objekt erstellt. Ich hab es nun mal geändert. Auch hab ich nun ein cond eingebaut.
Inzwischen hab ich etwas weiter an meinem Problem gearbeitet, das eigentlich ein Producer-MutliConsumer-Problem ist. Das heißt es gibt einen Erzeuger und mehrere Verbraucher. Weil ich dazu nix im Internet fand hab ich mir einen eigenen Puffer gebaut. Es ist eine Art Ringpuffer welcher die Zeiger recycelt. Er besteht aus zwei Listen, eine für neue Objekte die zu den Verbrauchern sollen, und einer für "verbrauchte" Zeiger die wieder verwendet werden können. Bisher sind die Objekte einfach int* die gleichzeitig mitzählen wieviele Verbraucher auf sie zeigen! Später kommen da eigene Objekte rein, welche eine member AktuelleKonsumenten haben. Initial wird dieses member mit der Anzahl an Konsumenten initialisiert. Deshalb hat der Puffer entnehmen() und entsagen(). entnehmen() entnimmt nicht wirklich was aus der Liste, sondern gibt dem Verbraucher nur das neueste Element. Braucht er es nicht mehr macht er entsagen(Element) und die member AktuelleKonsumenten an diesem Element wird dekrementiert. Ist sie auf 0 wird das Element von der einen Liste in die andere gehängt und kann wiederverwendet werden.
Das Problem an der Sache ist, dass jeder Konsument selber zählen muss wo in der Liste er sich befindet. Er muss also einen Iterator haben und den anfänglich auf den Anfang der Liste setzen. Nun hab ich in der Doku sogenannte thread_specific_ptr gefunden und es so verstanden, dass ich da Zeiger hab die je nach Thread unterschiedliche Werte haben können. Ich dachte mir das wäre perfekt für meinen Puffer. Ich errichte einen thread_specific_ptr der auf den Anfang der Liste zeigt und zugleich für jeden Thread welcher den Puffer benutzt das aktuelle Element der Liste speichert. So könnte der Puffer selber speichern wo sich welcher Thread gerade befindet. Stimmt das so?
Leider krieg ich das nicht implementiert. Es gibt immer einen Speicherzugriffsfehler. Ich hab das hier mal getestet: http://nopaste.debianforum.de/32636
Kann mir jemand sagen ob ich mir da was falsch gedacht hab oder ob ich was falsch umgesetzt hab?Vielen Dank schonmal im Voraus!
-
fabske schrieb:
Ehrlich gesagt hab ich nicht ganz verstanden was der Unterschied sein soll, ob man nun explizit lock aufruft oder so ein Objekt erstellt. Ich hab es nun mal geändert. Auch hab ich nun ein cond eingebaut.
Das release wird automatisch vom dtor aufgerufen (RAII).
-
fabske schrieb:
Kann mir jemand sagen ob ich mir da was falsch gedacht hab oder ob ich was falsch umgesetzt hab?
Du benötigst keinen selbstgeschriebenen Puffer, ein Queue oder eine Liste tut es auch.
Der Producer produziert fleißig Arbeit für seine Consumer, die möchte er den Consumern zugänglich machen und legt sie in einem Objekt (Puffer) ab welches von vielen verschiedenen Threads gesehen wird und deshalb der Zugriff synchronisiert erfolgen muss. Wenn der Producer also etwas in den Puffer, z.B. einen Queue oder eine Liste schreiben möchte holt er sich das Lock, schreibt hinein und benachrichtigt irgendeinen vermeintlich wartenden Consumer. Das locken sollte über das Mutex funktioneren und das benachrichten müsste über diese Condition mit notify_one() gehen.
Ein Consumer möchte sich ein Stückchen Arbeit abholen dazu muss er den Zugriff auf den Puffer locken, dann schaut er ob überhaupt was da ist und falls nichts da ist, gibt er das Lock frei und wartet auf ein Event (das notify) welches ihn fortfahren lässt. Das sollte mit .wait() auf der Condition funktionieren. Wenn er nicht warten muss, tut er seine Arbeit und verschwindet dann. Was auch immer "verschwinden" für dich bedeutet. (Wiederverwenden, Beenden etc.)
-
Vielen Dank TheTester für deine Erklärung! Ich denke ich habe das bereits verstanden und auch schon umgesetzt (siehe Kode).
Vielleicht hätte ich es der Deutlichkeit halber hinzuschreiben sollen. Bei meinem Problem soll der Produzent Objekte produzieren die an ALLE Konsumenten gehen. Also nicht einer kümmert sich darum, sondern jeder Konsument muss jedes Objekt vom Produzent erhalten. Deshalb hab ich mir einen Puffer geschrieben. Allerdings hab ich da Probleme mit dem thread_specific_ptr, bei dem ich hoffe dass mir jemand weiterhelfen kann?
-
Ich hab das Problem mit dem thread_specific_ptr nun mal auskommentiert bis mir jemand das beantworten kan. Hab gerade die cond getestet und gemerkt dass es nicht will. Der Puffer hat:
class Puffer { public: bool einfuegen(int*); int* entnehmen(list<int*>::iterator); int* entnehmen_wait(list<int*>::iterator); bool entsagen(list<int*>::iterator); bool print(); list<int*>::iterator anfang(void); boost::condition_variable_any NeueDaten; boost::thread_specific_ptr< list<int*>::iterator > Aktuell; private: list<int*> NeuRaus; list<int*> AltRein; boost::shared_mutex NeuRausMutex; boost::shared_mutex AltReinMutex; };
Ich hab eine zweite Funktion zum entnehmen erstellt, welche warten soll bis ein Element im Puffer ist:
int* Puffer::entnehmen_wait(list<int*>::iterator I) { boost::shared_lock<boost::shared_mutex> lock1(NeuRausMutex); while( I==NeuRaus.end() ) NeueDaten.wait(lock1); return *I; };
Was ich nicht verstehe ist warum man dem wait den lock mitübergeben muss!? Anscheinend damit es den lock aufhebt? Aber das könnte er doch generell machen wenn ich wait aufrufe. Oder gibt es noch einen anderen Grund?
Die Funktion zum Einfügen (mit recycling) ruft dann notify auf:bool Puffer::einfuegen(int* E) { boost::unique_lock<boost::shared_mutex> lock1(AltReinMutex); if( AltRein.empty() ) { boost::unique_lock<boost::shared_mutex> lock2(NeuRausMutex); NeuRaus.push_back(E); NeueDaten.notify_all(); } else { boost::unique_lock<boost::shared_mutex> lock2(NeuRausMutex); AltRein.front()=E; NeuRaus.push_back(AltRein.front()); AltRein.pop_front(); NeueDaten.notify_all(); } return true; };
Der ganze Kode ist hier: http://nopaste.debianforum.de/32703
Kompiliert und läuft, aber wartet eben nicht auf das notify
Leider finde ich keine gute Doku zu boost, immer nur Stückchen. Ich würde es gerne nachlesen aber es gibt nix zum Lesen
In dem Howto das ich benutze wird ein boost::mutex in diesem cond-Beispiel verwendet. Ich verwende aber wegen Lese und Schreibzugriff ein boost::shared_mutex. Ist das das Problem und wenn ja warum?Falls jemand mit sagen kann ob das mit dem thread_specific_ptr ein Denkfehler ist wäre ich auch sehr sehr dankbar! Brauche dingend Hilfe!
Vielen Dank, Fabian
-
Bei meinem Problem soll der Produzent Objekte produzieren die an ALLE Konsumenten gehen. Also nicht einer kümmert sich darum, sondern jeder Konsument muss jedes Objekt vom Produzent erhalten.
Dann sollte auch jeder Consumer auch seine eigene Queue haben, in der Auftragskopien, also die Objekte, abgelegt werden. Der Producer legt dann in 5 Faecher das Gleiche ab und nicht nur in eins.
-
Deine "entnehmen_wait" Funktion ist vollkommen falsch. Was soll der Iterator sein den du da übergibst?
(BTW: "netter" Mix aus Deutsch und Englisch. *würg*)Die Funktion muss eher so aussehen:
int* Puffer::entnehmen_wait() { boost::shared_lock<boost::shared_mutex> lock1(NeuRausMutex); while( NeuRaus.empty() ) NeueDaten.wait(lock1); return *NeueDaten.pop_front(); };
Und zu deiner Frage: den Lock musst du mitgeben, weil die Condition-Variable ja nicht weiss, welche Mutex verwendet wird, und daher freigegeben werden muss.
Und shared_mutex ist die "falsche" Klasse. Sollte zwar funktionieren, ist aber unnötig langsam. shared_mutex benötigst du, wenn du Zugriffe zwischen verschiedenen Prozessen synchronisieren willst.
-
Anscheinend versteht mich niemand
Ich habe einen Erzeuger der Elemente produziert die an ALLE Verbraucher geliefert werden. knivil hat natürlich recht, man kann auch jedem Verbraucher einen solchen Puffer verpassen und der Erzeuger liefert die Elemente dort ab. Bei vielen Verbrauchern hab ich dann aber das Problem, dass mein Erzeuger großteils mit dem verteilen von neuen Elementen beschäftigt ist anstatt mit dem produzieren. Oder ich mache einen Auslieferer-Thread der dann aber auch wieder einen Puffer braucht.
Aus diesem Grund hab ich mich entschieden beim Erzeuger einen intelligenten Puffer anzubringen, wo sich die Verbraucher die Elemente abholen. So hab ich nur einen Puffer und jeder hat ungefähr gleichviel zu tun (Erzeuger einmal einliefern, jeder Verbraucher einmal abholen).
Die Elemente sollen an JEDEN Verbraucher gehen, deshalb kann ich nicht einfach ein pop() machen! Sonst ist das Element weg und die anderen Verbraucher haben es nicht bekommen. Außerdem muss sich JEDER Konsument merken wo er sich im Puffer befindet, also welches Element er schon abgeholt und welches nicht. Deshalb gibt es zwei Funktionen:
entnehmen(Iterator) - entnimmt nicht wirklich was aus dem Puffer sondern liefert nur das FÜR DIESEN VERBRAUCHER neueste Element. Für andere Verbraucher ist ein anderes Element das momentan neueste.
entsagen(Iterator) - zählt am Element eine membervariable runter. Sobald diese auf 0 geht wieder das Element recyclt.
Verbraucher und Erzeuger werden natürlich von verschiedenen Threads betrieben. Aus diesem Grund brauche ich einen shared_mutex. Leider will das wait aber einfach nicht funktionieren.while( NeuRaus.empty() ) NeueDaten.wait(lock1);
und
while( I==NeuRaus.end() ) NeueDaten.wait(lock1);
ist genau das selbe. Wenn der übergebene Iterator auf das Ende der Liste zeigt ist sie demnach leer.
Kann mir irgendjemand sagen warum das wait() und notify() nicht funktionieren will? http://nopaste.debianforum.de/32703
-
fabske schrieb:
while( NeuRaus.empty() ) NeueDaten.wait(lock1);
und
while( I==NeuRaus.end() ) NeueDaten.wait(lock1);
ist genau das selbe. Wenn der übergebene Iterator auf das Ende der Liste zeigt ist sie demnach leer.
NEIN das ist NICHT das selbe!
Du hast da offensichtlich die Iteratoren noch nicht ganz verstanden.
Mit rohen Zeigern ginge das was du da machen willst durchaus (mit einigen Einschränkungen).
Mit std::list wird es aber NICHT funktionieren.p.S.: das mit shared_mutex vs. mutex war mein Fehler. Ich würde aber tendentiell auch keine shared_mutex hier verwenden -- oft ist der Overhead eines ReaderWriterLocks grösser, als das was man sich dadurch erspart. Plus mit einem ReaderWriterLock kann es schnell zu Writer-Starvation kommen (bzw. bei manchen Implementierungen zu Reader-Starvation).
-
hustbaer du hast Recht! Und ich auch
Wenn "mein" Iterator auf das Ende der Liste zeigt muss sie nicht leer sein, ganz klar, da hast du Recht. Aber die Abfrage ob die Liste leer ist bringt mir gar nix. Für einen Verbraucher kann sie leer sein, für den anderen nicht! Weil ich hab mehrere Verbraucher hab! Jeder hat seinen eigenen Iterator auf einen Punkt in der Liste und wandert damit weiter. Kommt er ans Ende ist FÜR IHN die Liste leer. So sieht es aus.
Wie ich nun debugged hab hatte ich einige Fehler drin (z.B. eben diesen Iterator vergessen hochzuzählen). Ob das wait nun geht oder nicht kann ich grad nicht sagen, weil ich momentan Probleme mit threads hab. Sobald die gelöst sind werde ich es nochmal testen.
-
Jeder hat seinen eigenen Iterator auf einen Punkt in der Liste und wandert damit weiter. Kommt er ans Ende ist FÜR IHN die Liste leer. So sieht es aus.
was du nicht zu verstehen scheinst, ist folgendes:
#include <list> #include <cassert> int main(int argc, char** argv) { std::list<int> list; list.push_back(1); std::list<int>::iterator it = list.begin(); // it zeigt auf den einser assert(it != list.end()); it++; // it zeigt auf end assert(it == list.end()); list.push_back(2); // it zeigt immer noch auf end assert(it == list.end()); // WICHTIG: dieses verhalten ist "üblich", aber AFAIK nicht im Standard definiert, // also auch nichts, worauf man sich verlassen dürfte return 0; }
-
Das ist das Problem hustbaer
Das ist diese "Abnormalität" die ich zu begreifen versucht hab. Solange der Puffer gefüllt ist und ich einen Iterator auf ein Element hab funktioniert alles gut. Sobald ich aber an die Grenzen der Liste komme oder sie gar leer wird, verhält sich mein Iterator nicht mehr wie gewünscht.
Was ich brauche ist ein Puffer bei dem der Iterator am Ende ankommen kann. Wenn man dann ein Element hinten einfügt soll diese nicht mehr auf das Ende sondern auf das eben hinten eingefügte (das letzte) Element zeigen. Kann ich das verhalten irgendwie beeinflussen? Gibt es einen anderen Container der das Verhalten an den Tag legt? Wohl nicht, weil es ja alles Iteratoren sind?