Producer-Consumer-Problem
-
fabske schrieb:
Das ist diese "Abnormalität" die ich zu begreifen versucht hab.
Deine Vorstellung von Iteratoren ist "abnormal", nicht das beobachtete verhalten
Solange der Puffer gefüllt ist und ich einen Iterator auf ein Element hab funktioniert alles gut. Sobald ich aber an die Grenzen der Liste komme oder sie gar leer wird, verhält sich mein Iterator nicht mehr wie gewünscht.
Was ich brauche ist ein Puffer bei dem der Iterator am Ende ankommen kann. Wenn man dann ein Element hinten einfügt soll diese nicht mehr auf das Ende sondern auf das eben hinten eingefügte (das letzte) Element zeigen. Kann ich das verhalten irgendwie beeinflussen?
Nö, nicht wirklich. Ausser du schreibst deinen eigenen Container, der soetwas kann.
Ich muss auch sagen dass ich es ziemlich grässlich finde, hier mit rohen Iteratoren zu arbeiten. Anders gesagt: ich wäre garnie auf die Idee gekommen, das so machen zu wollen. Damit legst du die Innereien der Klasse bloss, und kannst die Implementierung nichtmehr sauber ändern.Gibt es einen anderen Container der das Verhalten an den Tag legt? Wohl nicht, weil es ja alles Iteratoren sind?
Ich kenne keinen. Und siehe oben: ich würde es sowieso nie so machen.
Versuch mal das ganze etwas mehr zu abstrahieren.
Du hast einen "Producer", und mehrere "Consumer". Die "Consumer" müssen sich ja wohl irgendwie beim "Producer" an- und abmelden. Sonst weiss der ja nie was er noch für irgendeinen "Consumer" aufheben soll.
Sogesehen stellt so eine Anmeldung eine Resource dar (vergleichbar mit z.B. einer Socket Verbindung zu einem Server der irgendwelche Date streamt oder was auch immer).
Diese Resource solltest du IMO explizit machen, und der Einfachheit halber gleich mit RAII behandeln.
Das Interface könnte inetwa so aussehen:template <class T> class Queue : private boost::noncopyable { public: class Subscription : private boost::noncopyable { public: bool HasCurrent() const; // true wenn die Subscription ein aktuelles Element "hat" void ReleaseCurrent() const; // gibt das aktuelle Element an den Queue zurück ("entsagen") T const& GetCurrent() const; // Zugriff auf das aktuelle Element - wirft std::logic_error() wenn kein aktuelles Element vorhanden void Advance(); // gibt das aktuelle Element zurück, und holt sich das nächste (wartet bis eins verfügbar ist) bool TryAdvance(); // falls ein weiteres Element sofort vergübar ist, aktuelles zurückgeben, neues holen, return = true. // falls kein weiteres Element sofort verfügbar ist, return = false, sonst keine Änderung (aktuelles Element wird nicht zurückgegeben) // evtl. noch: bool TimedAdvance(TimeoutType timeout); // dasselbe wie TryAdvance, nur dass bis zu "timeout" gewartet wird, bevor "false" zurückgegeben wird // eine Subscription ist initial erstmal "leer", d.h. der Client muss erstmal Advance() sagen, damit es ein aktuelles Element gibt // ... private: friend Queue<T>; Subscription(Queue<T> const& owner); // ... }; boost::shared_ptr<Subscription> Subscribe(); // "unsubscribe" ist implizit über das Löschen der Subscription, was wiederum automatisch von boost::shared_ptr erledigt wird, // sobald der letzte boost::shared_ptr auf die Subscription verschwindet void PushElement(T const& element); private: // ... };
Damit hast du das Ganze erstmal schön gekapselt, und die Queue ("Puffer") Klasse läuft jetzt nichtmehr mit aufgeschlitztem Bauch und raushängenden Eingeweiden rum.
Die Implementierung musst du natürlich immer noch irgendwie machen.
Dazu gibt es mehrere Möglichkeiten.
Spontan fällt mir mal folgendes ein:
Du verpasst der "Queue" einen Zähler, mit der du alle reingesteckten Elemente durchnummerierst. Die Subscriber merken sich anstelle des Iterators auf das nächste Element nur die Nummer des nächsten Elements.
Die Queue speichert alle Elemente die noch gebraucht werden in z.B. einer std::deque (schnelles einfügen und entfernen vorne und hinten, schneller zugriff auf das N.te Element).
Damit die Queue auch weiss welche Nummer jedes Element in der std::deque hat, merkt sie sich zusätzlich noch die Nummer des ersten Elements.
Zusätzlich merkt sich die Queue zu jedem Element in der std::deque noch, wie viele Subscriber es gibt, die dieses Element noch nicht zurückgegeben haben, damit die Elemente auch irgendwann wieder entfernt werden können. (Dazu verwendet sie eine kleine Hilfsstruktur, die einen Zähler und das eigentliche Element beinhaltet. Die std::deque ist also keine std::deque<T>, sondern eine std::deque<Hilfsstruktur>).
Damit es bei Überläufen des Zählers keine Probleme gibt, musst du natürlich überall aufpassen, wo du mit Zählerwerten rumhantierst.
Alternativ könntest du auch als Zähler einen 64 Bit Integer verwenden, und einfach beschliessen, dass das Programm nie so lange laufen wird, dass der Zähler jemals überläuft. Rechne es dir durch, ich glaube diese Annahme ist bei heutiger Hardware einigermassen sicher.
(Bei 4GHz und einem neuen Element pro Take wären das 4 Milliarden Sekunden, und 4 Milliarden Sekunden sind lange)
-
Erstmal vielen Dank hustbear für den langen und ausführlichen Beitrag! In der Tat gibt es zwei Möglichkeiten: Entweder die Verbraucher melden sich beim Erzeuger an und er verwaltet die Elemente, oder der Erzeuger weiß nichts von der Anzahl der Verbraucher.
Ich hab mich für letztes entschieden, weil ich zusätzlich drauf achten muss dass das ganze sehr sehr schnell funktioniert. Ich will den Prozessor so wenig wie möglich belasten und ich meine zu wissen dass RAII und andere tolle Konzepte doch etwas overhead mitbringen.
Die Objekte die in den Puffer wandern haben bereits eine Variable "int Benutzung" in der initial (bei Erstellung) die Anzahl der Verbraucher eingetragen wird. Wird ein solches Objekt aus dem Puffer entnommen ist der Verbraucher in der Verantwortung dies zu dekrementieren. Der letzte räumt es dann auf. So muss der Erzeuger nichts von den Verbrauchern wissen und ich spare mir eine Verwaltungsstruktur am Erzeuger. Die Idee die Elemente durchzunummerieren bzw. eine ID zu verwenden hatte ich auch schon. Damit würde ich mir das hässliche übergeben des Iterators ersparen, das stimmt. Und wenn ich merke dass mir das zu viel Probleme macht werde ich mir auch eine solche Lösung wohl überlegen müssen! Das heißt, dass der Puffer (bzw. der Erzeuger) sich merkt wer was schon abgeholt hat.An dieser Stelle kommen wir wieder auf den thread_specific_ptr! Wäre das nicht die ideale Lösung? Wenn ich im Puffer einen thread_specific_ptr als Iterator habe, der (weil thread_specific_ptr) für jeden Verbraucher individuell ist, immer auf das aktuelle Element zeigt? Damit müsste ich keine Iteratoren vom Verbraucher an den Erzeuger mehr übergeben und hätte implizit schon die Verwaltungsstruktur?
Auf deinen Beitrag hin ab ich mich von std::list verabschieden und auf std::deque gewechselt. Wie ein Test ergab habe ich damit keine Probleme mehr beim Übergeben der Iteratoren!
#include <deque> #include <iostream> std::deque<int> data; int gibElement(std::deque<int>::iterator it) { if( it==data.end() ) return -1; // Ende! else return *it; }; int main(int argc, char** argv) { std::deque<int>::iterator it = data.begin(); data.push_back(1); data.push_back(2); std::cout << gibElement(it) << std::endl; it++; std::cout << gibElement(it) << std::endl; it++; data.pop_front(); data.pop_front(); data.push_back(3); data.push_back(4); std::cout << gibElement(it) << std::endl; it++; std::cout << gibElement(it) << std::endl; it++; std::cout << gibElement(it) << std::endl; return 0; }
Wenn ich jetzt noch den Iterator als thread_specific_ptr (in einer Thread-Umgebung) speichern könnte wäre es glaube ich perfekt!
-
RAII hat overhead? wo?
naja, egal.
ich gehe davon aus, dass eine saubere implementierung potentiell gleich schnell ist, wie eine weniger saubere lösung.
(EDIT: in diesem speziellen fall meine ich. es gibt natürlich andere fälle, wo man viel geschwindigkeit rausholen kann, wenn man auf "schön"/"sauber" verzichtet.)was die thread-specific-pointer angeht: wenn du unbedingt willst, dann mach es so. ich würde in diesem fall ein explizites objekt ("Subscription") definitiv vorziehen. ist sauberer. und du sparst den - zugegebenermassen nicht sehr grossen - overhead den thread-specific-pointer haben.
-
Eventuell schwenk ich doch noch um hustbaer, deshalb sind deine Vorschläge nicht umsonst! Das ist mein erstes richtiges Softwareprojekt und ich sammle gerade Erfahrungen.
Du meintest weiter oben dass du gar keine shared_mutexe verwenden würdest. Bisher habe ich das ja für meine deques:
std::deque<Daten*> NeuRaus; boost::shared_mutex NeuRausMutex;
Je nachdem ob ich einfüge oder nur lese mach ich dann ein
boost::unique_lock<boost::shared_mutex> lock(NeuRausMutex);
oder ein
boost::shared_lock<boost::shared_mutex> lock(NeuRausMutex);
Soll ich das deiner Meinung nach einfach immer dadurch ersetzen?
boost::mutex NeuRausMutex; boost::lock_guard<boost::mutex> lock(NeuRausMutex);
-
Ja, genau.
Du kannst aber auch einfach 3 Typedefs machen:#if 1 // use shared_mutex or not typedef boost::shared_mutex MutexType; typedef boost::shared_lock<MutexType> SharedLock; typedef boost::unique_lock<MutexType> UniqueLock; #else typedef boost::mutex MutexType; typedef boost::unique_lock<MutexType> SharedLock; // unique_lock<> kann man auch mit "nicht-shared" Mutexen verwenden typedef boost::unique_lock<MutexType> UniqueLock; #endif
Wenn du im Code dann überall nurmehr die Typedefs verwendest, kannst du alles nur über die Typedefs von "shared_mutex" auf "mutex" umschalten, und wieder zurück.
z.B. wenn du Performence-Vergleiche machen willst.
-
Wieso sollte RAII einen Overhead haben?
Ob ich eine Release Funktion, delete oder sonstwas "per hand" aufrufe, oder der Destruktor das macht, kommt doch aufs selbe heraus.Zum shared-lock: Der macht nur Sinn, wenn man mehrere lesende Threads hat und der Lesezugriff auch ne Weile dauert. Bei queue.front(); queue.pop(); wird sich das kaum lohnen. Außerdem hast Du nur einen Consumer, wenn ich das beim Überfliegen richtig verstanden habe.
-
Nönö, er hat schon mehrere Consumer.
Bin mir aber fast sicher, dass es trotzdem Overkill ist.
Eben weil die Locks nur so kurz gehalten werden.
-
hustbaer ich hab es eingesehen. Die Idee mit den iteratoren war keine gute. Ich weiß nicht warum aber mit der Zeigerlösung schaffe ich es immer nur 319 Objekte vom Erzeuger zum Verbraucher zu schauffeln, danach kommt ein Speicherzugriffsfehler. Liegt wohl an den Erklärungen dieses Threads: http://www.c-plusplus.net/forum/viewtopic-var-t-is-256167-and-highlight-is-.html
Ich habe nun umgebaut auf IDs. Die hatte ich schon vorher. Es handelt sich um einen timestamp der Erstellungszeit meiner Daten die ich nun als ID benutze. Anstatt die iteratoren übergebe ich nun IDs und lass sie in den deques suchen. Puffer hab ich auch abgeschafft und direkt in Modul.cpp integriert. Falls es dich interessiert: http://codepad.org/lBGbcbNS