Subscriber mit Callback



  • Hallo,

    ich habe einen Subscriber-Mechanismus, der ungefähr wie folgt aussieht:

    template<class... ArgsT>
        class Subscriber
        {
        public:
            using Callback = std::function<void(ArgsT...)>;
    
        private:
            using MyLock = NonRecursiveLock;
            using SubscriberMap = std::map<int /*token*/, std::pair<std::string, Callback>>;
    
        public:
            Subscriber() : m_spSubscriberMap(new SubscriberMap()) {}
            Subscriber(const Subscriber&) = delete;
            Subscriber& operator=(const Subscriber&) = delete;
    
            int Add(const std::string& in_name, Callback&& in_callback)
            {
                MyLock lock(m_mutex);
    
    ...
    

    Im Prinzip können Clienten sich mit einem Callback registrieren und werden benachrichtigt, sobald Subscriber::Publish() aufgerufen wird.

    Nun gibt es natürlich immer das Problem, dass der CLient einen Callback implementieren kann, der lange dauert oder sogar blockiert. In der Zeit ist das Objekt Subscriber blockiert.

    Meine Frage ist nun, wie löst man das geschickt auf? Gibt es dafür ein Pattern?

    Eine Idee wäre es, dass der Client bei seiner Anmeldung einen Thread übergibt, in dem dann der Callback gepostet und ausgeführt wird. 😕



  • Könnte das "Proactor pattern" sein. 🙂



  • Ich würde das nicht als Proactor Pattern bezeichnen.

    Und ich gehe mal davon aus dass der Name Subscriber in deinem Beispiel falsch ist. Ein Subscriber ist jemand der etwas bekommt, der benachrichtigt wird. Der passive Teil. Die Klasse die du aber zeigst bietet Subscriptions an, ist also der Publisher und nicht der Subscriber. (Der Subscriber ist dann der der "Add" aufruft.)

    Ich werde im weiteren die Begriffe Publisher (der wo man sich registrieren kann) und Listener (der der sich registriert) verwenden um Verwechslungen vorzubäugen.

    Davon abgesehen...
    Ich kenne zwei sinnvolle Möglichkeiten das zu lösen.

    1. Jeder Listener muss einen Objekt vom Typ Dispatcher erstellen. Mit diesem kann er sich dann beim Publisher registrieren. Dispatcher ist dabei eine konkrete Klasse deren Funktion man nicht durch ableiten o.ä. verändern kann. Die "Notify" Funktion des Dispatchers (=die die vom Publisher aufgerufen wird) ist dabei so implementiert dass sie nie länger blockt. z.B. indem jedes Dispatcher Objekt eine Queue von Notifications hat wo die Notification einfach nur schnell reingesteckt wird und dann ist "Notify" schon fertig. Der Dispatcher kann dann z.B. in einem eigenen Thread (einer pro Dispatcher Objekt) diese Liste abarbeiten.
      Das entspricht inetwa dem was du mit "Thread übergeben" beschrieben hast. Nur so wie du es beschreibst funktioniert es nicht, da du nach Erstellung eines Threads nicht einfach "von aussen" bewirken kannst dass dieser eine Funktion deiner Wahl ausführt. Damit das funktioniert muss der Thread schon passenden Code ausführen der eben z.B. in einer Queue guckt ob es etas zu tun gibt und es dann tut.

    Damit verhinderst du recht effektiv dass jemand sich nicht an die Regeln hält und Probleme verursacht. In bestimmten Projekten, gerade grösseren Projekten, kann das Sinn machen. Da ein schlecht implementierter Programmteil dann nicht so schnell zu gröberen Störungen des gesamten Systems führt.

    Im Prinzip ist das auch genau das was z.B. Windows mit Fenstern und Message-Queues macht. Jedes Fenster kann sich für bestimmte Benachrichtigungen registrieren, aber wenn ein Fenter "hängt" hängt deswegen nicht gleich das ganze System.

    1. Du stellst einfach die Regel auf dass Callbacks nicht lange brauchen dürfen. Listener die u.U. mehr zu tun haben sind dann selbst dafür verantwortlich dass sie die Ausführung des Codes der zu lange brauchen könnte irgendwie entkoppeln.
      Das ist meist die einfachere Lösung. Vor allem ermöglicht sie, für Listener deren Callbacks nicht lange brauchen, synchrone Benachrichtigung. Und spart Resourcen.

    Auch in dieser Variante wird man sich sowas wie den unter (1) beschriebenen Dispatcher basteln, da es wohl kaum Sinn machen wird diese Funktionalität in allen "könnte lange brauchen" Listener Klassen mehrfach zu implementieren.

    Die Entscheidung ob mit oder ohne Dispatcher liegt dann aber beim jeweiligen Listener, der Publisher selbst weiss in diesem Fall nichtmal was davon dass es den Dispatcher überhaupt gibt.



  • Erst einmal danke für die Antwort!

    Mit dem Wort "Thread" hatte ich nur grob angedeutet, dass der Callback in einem anderen Thread ausgeführt wird. Dass das kein einfacher std::thread ist, sondern einem ThreadPoolTaskExecuter (MessageQueue) ist schon klar.

    Die Beziehung zwischen Listener und Dispatcher ist mir bekannt.

    Dazu kann man sich auch diese Seite ansehen, die ich gestern schon vor meinen Beitrag hier gelesen habe.

    https://doanduyhai.wordpress.com/2012/08/04/design-pattern-the-asynchronous-dispatcher/

    Im Prinzip soll ja ein asynchroner Dispatcher dafür sorgen, dass das Dispatchen schnell vonstatten geht und die Ereignisse dann asynchron in einem Thread abgearbeitet werden.

    Momentan ruft jemand von außen notify() auf und es wird synchron die Liste der Subscriber durchgearbeitet und jeweils über den den Funktionszeiger irgendein Callback aufgerufen, der beliebig lange dauern kann.

    Meine Vorstellung geht in deine Richtung. Der Client, der sich mit Add() anmeldet, übergibt noch ein Objekt Dispatcher. Wird nun notify() aufgerufen, ruft die Klasse oben bei jedem angemeldeten Observer auf seinem Dispatcher dispatch() auf und kehrt sofort zurück. Der ThreadPoolTaskExecuter von dem Observer kümmert sich dann selbst darum, den asynchron Callback auszuführen.



  • OK.
    Hast du noch eine Frage?


Anmelden zum Antworten