Multipurpose Nachricht zwischen Threads



  • Hi

    also ich habe ein System aus verschiedenen WorkerThreads, die untereinander über ThreadSafe-Queues kommunizieren.

    Ich transportiere auf diesem Wege also viele verschiedene Objekte zwischen den Workern hin und her.

    Bisher hatte ich immer eine Message als Basisklasse und habe 10-100 spezialisierte Messages davon abgeleitet und diese durch die Queues gescheucht.

    Nun ist das bei kleinen Projekten ok. Wenn aber das Projekte größer wird und die spezialisierten Messages immer größer werden, nervt das Klassengebastel.

    Daher wollte ich mir eine Message bauen, die erstmal grundsätzlich aus einem Buffer ( z.b. std::vector<char> oder std::string ) besteht, der beliebig vergrößert werden kann.

    In diesen Buffer wollte ich dann meine Objekte reinswappen.

    Erzeugung einer Message sollte dann etwa so aussehen:

    std::shared_ptr<SomeType> FooTransport;
    unsigned bla = 1337;
    ...
    
    auto msg = std::make_shared<Message> ( 1 );
    msg->push<SomeType> ( FooTransport );
    msg->push<unsigned> ( bla );
    
    ...
    
    versende( msg );
    

    Wären die Objekte alle einfache Pointer, könnte ich die Daten einfach in den Buffer kopieren und den Pointer ausnullen, damit er nach dem "versende"-Aufruf unbenutzbar ist.

    Bei PODs wie z.B. unsigned kann ich auch einfach swappen.

    Aber was mache ich mit z.B. shared_ptr<...>-Typen oder KlassenObjekten?
    Die kann ich doch nicht einfach swappen? Im Beispiel wird ja dann 'FooTransport' mit dem bisherigen Inhalt des Buffers ( undefiniert ) befüllt und wird somit höchstwahrscheinlich hinterher total schrott / undefined sein.

    Ist das Problem für euch nachvollziehbar? Vorschläge?



  • Zusatzhinweis:

    Lösung muss auf jeden Fall performant sein, also irgendwelche Kopiergeschichten fallen aus.

    Ein schlampige Lösung würde ja ohne swap gehen, in dem man einfach das zu transportierende Objekt im Heap anlegt ( über Kopierkonstruktor ) und dann nur die Adresse in den Buffer kopiert.

    Aber diese Lösung wollte ich eigentlich nicht, daher meine Suche nach einer swap/move/etc.-Lösung.



  • Also das hier funzt schonmal nicht:

    template<typename T> void push( T &obj, char *&act )
    {
        *( (T*)act ) = std::move( obj );
        act += sizeof( T );
    }
    
    template<typename T> void pop( T &obj, char *&act )
    {
        obj = std::move( *( (T*)act ) );
        act += sizeof( T );
    }
    
    int main()
    {
        std::string Message;
        Message.resize( 1000 );
        char *pos = (char*)Message.data();
    
        // reinschreiben
        std::string TestObject = "foobar";
        push<std::string> ( TestObject, pos );
    
        // auslesen
        pos = (char*)Message.data();
        std::string out;
        pop<std::string> ( out, pos );
    
        std::cout << out << "\n";
    
        return 0;
    }
    

    Gibt in der ersten Zeile der Funktion "push<T>" einen SegFault...

    Aber zumindest so in der Richtung dachte ich mir das.



  • So würde man das machen:

    template <typename T> void push(T&& obj, char *&act) {
      using type = typename std::remove_reference<T>::type;
      new (act) type(std::forward<T>(obj));
      act += sizeof(type);
    }
    
    template <typename T> T& top(char *act) {
      return *reinterpret_cast<T*>(act);
    }
    
    template <typename T> void pop(char *&act) {
      top<T>(act).~T();
      act += sizeof(T);
    }
    
    int main() {
      std::string Message;
      Message.resize(1000);
      char *pos = (char *)Message.data();
    
      // reinschreiben
      std::string TestObject = "foobar";
      push(std::move(TestObject), pos);
    
      // auslesen
      pos = (char *)Message.data();
      auto out = std::move(top<std::string>(pos));
      pop<std::string>(pos);
    
      std::cout << out << "\n";
    }
    

    Du hast vergessen, Konstruktor und Destruktor aufzurufen.



  • Wow, das funzt jetzt aber. Danke schön.
    Ich muss mir jetzt aber erstmal anschauen, was du da in den Funktionen so wildes treibst. Da muss ich erstmal Forschung betreiben 😉

    Danke erstmal 😃



  • It0101 schrieb:

    Da muss ich erstmal Forschung betreiben 😉

    Das Stichwort lautet placement new, wenn du es noch nicht gefunden hast. Der Rest mit remove_reference und forward ist nur drumherum.



  • Ok danke. Da hab ich offensichtlich Defizite die ich jetzt beheben muss 😉



  • Zusätzlich sollte in push , top und pop noch std::align verwendet werden.
    Sonst bekommst du auf einigen Plattformen Probleme.

    ps:
    @sicherohneausnahme
    remove_reference sollte hier nicht nötig sein. IIRC gilt sizeof(T) == sizeof(T&) && sizeof(T) == sizeof(T&&)



  • hustbaer schrieb:

    remove_reference sollte hier nicht nötig sein. IIRC gilt sizeof(T) == sizeof(T&) && sizeof(T) == sizeof(T&&)

    Das mag zwar sein, allerdings benötigt man den Typ ohne Referenz für das placement new.



  • sebi707 schrieb:

    Das mag zwar sein, allerdings benötigt man den Typ ohne Referenz für das placement new.

    💡



  • hustbaer schrieb:

    sebi707 schrieb:

    Das mag zwar sein, allerdings benötigt man den Typ ohne Referenz für das placement new.

    💡

    💡



  • Hi!

    Nur so als Anregung: Ich verwende gerne eine Basisklasse für sehr allgemeine Arbeits-Threads, die nicht viel anderes machen als std::function -Objekte in einem queue der Reihe nach auszuführen. So kann man dem Thread bequem beliebige Aufgaben geben, oder bei Bedarf die Basisklasse auch für spezialisierte Aufgaben erweitern. Ich weiss zwar nicht, was für Briefe sich deine Threads so schreiben, aber auf so einem function-Arbeitstread kann man durchaus ein Message-System aufsetzen. Das, was du mit deiner "Multipurpose"-Nachricht erreichen möchtes (so wie ich es verstanden habe) lasse ich dabei einfach von std::bind erledigen. Als Prototyp sähe das Ganze dann grob etwa so aus:

    #include <functional>
    #include <string>
    #include <memory>
    #include <mutex>
    #include <deque>
    
    struct Object 
    {
        void print()
        {
            std::cout << "Hallo vom Objekt!" << std::endl;
        }
    };
    
    class WorkerThread
    {
        public:
            typedef std::function<void()> TaskFunction;
    
            void enqueue(TaskFunction taskFunction)
            {
                std::lock_guard<std::mutex> lock(queueMutex);
                taskQueue.emplace_back(std::move(taskFunction));
            }
    
            void threadFunction()
            {
                {
                    std::lock_guard<std::mutex> lock(queueMutex);
                    while (!taskQueue.empty())
                    {
                        taskQueue.front()();
                        taskQueue.pop_front();
                    }
                }
    
                // Warten, bis queue nicht mehr leer (condition variable o.ä.)
            }
    
        private:
            std::mutex queueMutex;
            std::deque<TaskFunction> taskQueue;
    };
    
    class MessageThread : public WorkerThread
    {
        public:
            class Message
            {
                friend class MessageThread;
                protected:
                    typedef std::function<void(MessageThread* thread)> MessageFunction;
    
                    Message(MessageFunction messageFunction)
                    : messageFunction(std::move(messageFunction))
                    {
                    }
    
                private:
                    MessageFunction messageFunction;
            };
    
            class Message1 : public Message
            {
                public:
                    Message1(int arg1, std::string arg2, const std::shared_ptr<Object>& arg3)
                    : Message(std::bind(&MessageThread::message1Function, std::placeholders::_1, arg1, arg2, arg3))
                    {
                    }
            };        
    
            void enqueue(const Message& message)
            {
                WorkerThread::enqueue(std::bind(message.messageFunction, this));
            }
    
        private:
            void message1Function(int arg1, std::string arg2, const std::shared_ptr<Object>& object)
            {
                std::cout << "Nachricht Typ 1 #" << arg1 << ": string = " << arg2 << std::endl;
                object->print();
            }
    };
    
    int main()
    {
        MessageThread thread;
    
        auto object = std::make_shared<Object>();
    
        std::cout << "ENQUEUE" << std::endl;
        thread.enqueue(MessageThread::Message1(1, "Nachricht 1", object));
        thread.enqueue(MessageThread::Message1(2, "Nachricht 2", object));
        thread.enqueue(MessageThread::Message1(3, "Nachricht 3", object));
    
        std::cout << "PROCESS" << std::endl;
        thread.threadFunction();
    }
    

    Der Code ist natürlich noch zu verfeinern, aber ich denke die Idee kommt rüber.
    Vielleicht ist das ja eine etwas bequemer zu implementierende Alternative.

    Gruss,
    Finnegan


Log in to reply