Boost.Asio mit "gefährdeten" Objekten.



  • Hi zusammen,
    ich bin gerade dabei, mich ein wenig in Boost.Asio reinzufuchsen (muss zugeben, dass ich damit noch nie ernsthaft gearbeitet habe). Ich hab mich für meinen kleine Client/Server-Test nach dem chat-Beispiel aus der Doku gerichtet, hab statt boost::bind für die Handler allerdings Lambdas benutzt, die Methoden der jeweiligen Klasse aufrufen. Dabei ist mir folgendes aufgefallen:
    - Wenn ich den Lambdas nur den this-Pointer übergebe, laufe ich Gefahr, dass die Handler aufgerufen werden, nachdem das Objekt zerstört wurde (ich hab io_service::run() in einem separaten Thread laufen)
    - Wenn ich den Lambdas mittels shared_from_this shared_ptr auf das auslösende Objekt übergebe, kann ich das Objekt nicht mehr wirklich explizit zerstören, weil die Handler-Lambdas es mit dem shared_ptr am Leben halten. Mein async_accept am Server hat den io_service-Thread zwischendurch unendlich am Leben gehalten.

    Meine Lösung sieht also fast immer wie folgt aus:

    using namespace boost;
    using namespace asio;
    void TCPServerCommunicator::receiveHeader()
      {
        std::weak_ptr<TCPServerCommunicator> weakThis(shared_from_this());
        async_read(socket, buffer(/*...*/),
        //capture weakThis und this
          [=](const system::error_code& error, unsigned /*bytesread*/) 
        {
          //damit nicht mitten im Handler der Communicator zerstört werden kann.
          auto sharedThis(weakThis.lock()); 
          if (!sharedThis) return;
    
          //der eigentliche Handler
          if (error) WTF;
          receiveBody();
        });
      }
    
    void TCPServerCommunicator::receiveBody(SEventPtr const& evtPtr)
      {
        std::weak_ptr<TCPServerCommunicator> weakThis(shared_from_this());
        async_read(socket, buffer(/*...*/),
          [=](const boost::system::error_code& error, unsigned /*bytesread*/)
        {
          auto sharedThis(weakThis.lock());
          if (!sharedThis) return;
    
          if (error) WTF; 
          decodeAndSendEvent(evtPtr);
          receiveNewEvent();
        });
      }
    

    Es ist immer das gleiche Muster: weak_ptr erstellen, vom Lambda capturen lassen, im Lambda locken und abfragen, bevor die eigentliche Logik kommt. Kennt jemand ne bessere Lösung, die mir nicht so den Code "vollsaut"? 😉



  • Die einzige (mir bekannte) Möglichkeit:

    1. Jede Verbindung bekommt einen eigenen Strand.
    2. Alle asynchronen Operationen der Verbindung werden über diesen Strand gestartet.
      ASIO schreibt dann vor dass alle "Kind-Operationen" die intern verwendet werden über den Strand der Eltern-Operation gestartet werden müssen.
    3. Du bindest wieder shared_ptr in deine Objekte.
    4. Wenn du eine Verbindung abbrechen willst queuest du einen Callback über den Strand der Verbindung. Das stellt sicher dass der Callback nicht gleichzeitig mit irgend einem Handler laufen kann der mit dieser Verbindung arbeitet.
    5. Im Callback schliesst du den Socket mit close().
      Dadurch terminieren dann alle ausständigen Operationen "zügig" mit "operation aborted" o.Ä., d.h. der Handler wird "zügig" aufgerufen (und dann natürlich zerstört, was dann den gebundenen shared_ptr zerstört -> juchui).
      IIRC musst du dabei in deinen eigenen Handlern dann gar nichts besonderes mehr machen (=nicht checken ob der Socket noch offen ist), da asynchrone Operationen auf einem geschlossenen Socket erlaubt sind, aber sofort mit einem Fehler abgeschlossen werden (lies das aber nochmal selber nach falls du es so machen willst - bin mir nur fast sicher dass es so ist).

    Der Verbindungsabbruch geht dabei also relativ flott, aber natürlich nicht synchron. Weil der Callback ja nur gequeued werden kann, und halt dann erst irgendwann mal ausgeführt wird. Zum kontrollierten Niederfahren eines Servers sollte es aber reichen.

    ps: passt nicht genau dazu, aber doch irgendwie und weil es mich damals recht verwundert hat...: es gibt (Extrem-)Situationen, in denen der Handler für eine Operation niemals aufgerufen wird. In diesen Situationen wird statt dessen der Handler von einem beliebigen Thread zu einem beliebigen Zeitpunkt einfach zerstört.
    An die Umstände wie das passieren kann kann ich mich nicht mehr genau erinnern - ich meine aber es war nur wenn ein Handler nicht gequeued werden kann. Und das sollte eigentlich nicht vorkommen, ausser wenn einem entweder der Speicher ausgeht ( bad_alloc -> kann man sowieso kaum gescheit handeln), oder aber man doof genug ist einen Handler zu schreiben der auch ohne bad_alloc beim Kopieren eine Exception werfen könnte).

    D.h. idealerweise sollte das Programm mitbekommen wenn die letzte Kopie eines Handlers zerstört wird (bevor er aufgerufen wurde), und daraufhin das Verbidungs-Objekt zerstören. Wenn man das Verbidungs-Objekt *ausschliesslich* darüber am Leben erhält, dass man shared_ptr in die Handler reinbindet, dann geht das automatisch. (Es ist ja im Normalfall immer irgend eine Operation "pending", und diese hält das Verbindungs-Objekt dann am Leben)



  • Okay, Strands muss ich mir dann wohl mal anschauen, hatte ich bisher noch nicht. Klingt aber dann auch wieder nach doppeltem Aufwand, um die Verbindung abzubrechen: den Callback triggern und den "permanenten" shared_ptr (den, der nicht irgendwo in der Handler-queue hängt) löschen. Dass ich die Dinger noch Permanent irgendwo hängen hab liegt daran, dass der jeweilige "Communicator" sowohl empfängt (Über die async-Handler wie gezeigt) als auch benutzt wird, um asynchron Events in die Gegenrichtung zu senden.

    Ich hab vorhin mal etwas rumgespielt und folgendes aus dem weak_ptr-Muster von oben gemacht:

    //weak_lambda_from_this.h
    #ifndef WEAK_LAMBDA_FROM_THIS_HPP
    #define WEAK_LAMBDA_FROM_THIS_HPP
    
    #include <functional>
    #include <memory>
    #include <boost/function_types/function_arity.hpp>
    
    #ifndef UTIL_WEAK_LAMBDA_MAX_ARITY
    #define UTIL_WEAK_LAMBDA_MAX_ARITY 10
    #endif
    
    #define EMPTY(n) BOOST_PP_EMPTY()
    #define TEMPLATE_HEADER(n) \
       template <BOOST_PP_ENUM_PARAMS(n, class T)>
    #define DECLARE_PARAM(z, n, unused)  \
       T##n&& t##n
    #define MULTIPLY(n) \
       BOOST_PP_REPEAT(n, FACTOR, ~)
    #define FACTOR(z, n, unused) \
       BOOST_PP_IF(n, *, ) t##n
    #define FORWARD_PARAM(z, n, unused) \
       std::forward<T##n>(t##n)
    
    #define DECLARE_OPERATOR(z, n, unused)                         \
       BOOST_PP_IF(n, TEMPLATE_HEADER, EMPTY)(n)                   \
       void operator()(BOOST_PP_ENUM(n, DECLARE_PARAM, ~))  \
       {                                                           \
          static_assert(n <= arity, "Too many arguments!");        \
          static_assert(n>= arity, "Not enough arguments!");       \
          auto sharedThis = weakThis.lock();                       \
          if (!sharedThis) return;                                 \
          func(BOOST_PP_ENUM(n, FORWARD_PARAM, ~));                \
       }                                                           \
    
    namespace util {
      template <class Host>
      class weak_lambda_from_this : public std::enable_shared_from_this<Host>
      {
        template <class Func>
        struct WeakLambda
        {
          const static unsigned arity = boost::function_types::function_arity<Func>::value;  
    
          std::function<Func> func;
          std::weak_ptr<Host> weakThis;
          WeakLambda(std::function<Func>&& f, std::weak_ptr<Host>&& wt) 
            : func(std::move(f))
            , weakThis(std::move(wt))
          {}
    
          BOOST_PP_REPEAT(BOOST_PP_INC(UTIL_WEAK_LAMBDA_MAX_ARITY), DECLARE_OPERATOR, ~)
        };
    
      protected:
        template <class Func>
        WeakLambda<Func> weak_lambda(std::function<Func> f)
        { return WeakLambda<Func>(std::move(f), shared_from_this()); }
      };
    } //end ns util
    
    #undef FORWARD_PARAM
    #undef DECLARE_PARAM
    #undef TEMPLATE_HEADER
    #undef EMPTY
    #undef MULTIPLY
    #undef FACTOR
    #undef DECLARE_OPERATOR
    
    #endif
    
    class TCPServerCommunicator 
      : public util::weak_lambda_from_this<TCPServerCommunicator> //statt enable_shared_from_this { /* ... */ };
    
    using namespace boost;
    using namespace asio;
    void TCPServerCommunicator::receiveHeader()
      {
        async_read(socket, buffer(/*...*/),
          weak_lambda<void(const system::error_code&, unsigned)>
          ([=](const system::error_code& error, unsigned /*bytesread*/)
        {
          //der eigentliche Handler
          if (error) WTF;
          receiveBody();
        }));
      }
    

    Ich bin noch dran, die Template Argument Deduction für die weak_lambda Funktion zu basteln, ist etwas komplexer. Der Makromist ist der MSVC-Ersatz für ein variadic template, das einfach die Argumente des op() an den gespeicherten Functor forwarded...



  • pumuckl schrieb:

    Klingt aber dann auch wieder nach doppeltem Aufwand, um die Verbindung abzubrechen: den Callback triggern und den "permanenten" shared_ptr (den, der nicht irgendwo in der Handler-queue hängt) löschen. Dass ich die Dinger noch Permanent irgendwo hängen hab liegt daran, dass der jeweilige "Communicator" sowohl empfängt (Über die async-Handler wie gezeigt) als auch benutzt wird, um asynchron Events in die Gegenrichtung zu senden.

    Jopp, das ist ein wenig lästig.
    U.a. wegen solcher Lästigkeiten bin ich auch nicht der grösste Fan der ASIO.

    Die wurde ja eigentlich gemacht um asynchrones IO Gedöns ganz toll flexibel und einfach zu machen - aber im Endeffekt ersetzt man das Schreiben von viel (z.T. plattformspezifischem) eigenem Code durch das das Doku-Wälzen und Raten und sich ein Muster von der ASIO aufpressen lassen, weil sonst nix mehr ordentlich funktioniert.

    Der Synchrone Teil der ASIO ist ja nett. Der A Teil der ASIO ist aber teilweise für'n A 🤡

    ps: wie hattest du das denn ohne Strands überhaupt geplant? Eigene Mutex pro Connection, oder wie wolltest du sicherstellen dass nicht zwei Threads gleichzeitig auf den Stream (oder sonstige Daten der Connection) zugreifen - also für den Fall dass du asynchron irgendwas an die verbundenen Clients melden willst?



  • Bis jetzt war es bei mir nie nötig die Objekte, mit denen die Handler verbunden sind, explizit zu löschen. Bis jetzt habe ich das so gemacht, dass die Handler "abgebrochen" werden können z.B. durch schliessen des Sockets. Allerdings kümmerte ich mich danach nicht darum wie lange das Objekt dann noch lebt - lange kann es allerdings nicht sein, denn z.B. nach schliessen des Sockets sollten die Handler mit operation_aborted aufgerufen werden.

    Oder habe ich etwas verpasst?

    Edit:
    Anmerkung: Für die Handler benutze ich natürlich schon shared_from_this().



  • theta schrieb:

    Bis jetzt habe ich das so gemacht, dass die Handler "abgebrochen" werden können z.B. durch schliessen des Sockets.

    Aktuell habe ich folgende Objekte, die mit asio arbeiten: Am Server einen "TCPListener", der aus nicht viel mehr als einem Acceptor besteht, der auf ankommende Verbindungen wartet, die annimmt und die nötige Buchhaltung durchführt (Meldung an den Server, dass ne neue Verbindung da ist usw.) Außerdem für jeden verbundenen Client den oben in Teilen gezeigten "ServerCommunicator", der für das ein- und auspacken der verschickten/erhaltenen Events zuständig ist. Der jeweilige Client bekommt einen entsprechenden "ClientCommunicator", der das Selbe auf der anderen Seite erledigt.

    Wenn vom Server aus jetzt eine Verbindung beendet werden soll, wird der entsprechende ServerCommunicator genommen und aus der (shred_ptr)-Liste der verbundenen Clients gekickt. Wenn ich da vorher noch für sorgen muss, dass ggf. Sockets geschlossen werden müssen, muss ich entweder erst schauen, ob es überhaupt ein TCPServerCommunicator ist (soll auch andere geben, s.u.) oder der Basisklasse eine allgemeine virtuelle "shutdown"-Methode verpassen, die für alles außer TCP-Communicatoren wenig Sinn macht. Halte ich designtechnisch für suboptimal, ein einfaches "delete and forget" liegt mir da am Herzen.
    Ähnlich siehts bei dem TCPListener aus, dort müsste ich explicit den acceptor canceln, damit die Warterei auf neue Verbindungen ein Ende hat. Wenn der acceptor nur an einem shared_ptr im Server hängt und die ioservice-queue nur weak_ptr enthält, kann das canceln in den Dtor und ich werf ihn einfach aus dem Server, wenn ich ihn runterfahre, und gute Nacht.

    hustbaer schrieb:

    ps: wie hattest du das denn ohne Strands überhaupt geplant? Eigene Mutex pro Connection, oder wie wolltest du sicherstellen dass nicht zwei Threads gleichzeitig auf den Stream (oder sonstige Daten der Connection) zugreifen - also für den Fall dass du asynchron irgendwas an die verbundenen Clients melden willst?

    Im Grunde genommen habe ich wenn man genau hinschaut nur zwei Threads und (so gut wie) keine konkurrierednen Zugriffe. Die zwei Threads entstehen auch nur dadurch, dass der Server als Active Object von dem Client erzeugt wird, der das Spiel hostet:

    //skizziert:
    class Server 
    {
      boost::asio::io_service ioService;
      std::unique_ptr<boost::asio::io_service::work> work; //hält den ioService am Leben...
      std::unique_ptr<boost::thread> serverThread;
      std::shared_ptr<TCPListener> tcpListener;
    public:
      Server(std::shared_ptr<communication::LocalServerCommunicator const> localComm)
        : /*initialisierungen*/
      {
        serverThread = std::make_shared<boost::thread>([this](){ioService.run();});
      }
    
      ~Server()
      {
        work.reset();
        tcpListener.reset();
        serverThread->join();
      }
    
      /** Senden eines Clientevents an die Verarbeiter innerhalb des ServerThreads */
      void processClientEvent(communication::ClientEvent evt)
      {
        ioService.post([=](){/*process the event*/};);
      }
    };
    

    Alles im Server läuft also über den selben ioService, die TCPServerCommunicatoren rufen processClientEvent auf, was im Grunde auch wieder nur einen Handler in die queue hängt. Diese Handler können natürlich das Versenden von Events an die Clients triggern, was aber auch nur wieder in die queue gehängt wird, kurz, ALLES was im Server geschieht, läuft im selben Thread. Die einzigen Schnittstellen zwischen dem Serverthread und dem lokalen Clientthread (Main-Thread) sind die "processClientEvent" Methode und ein "LocalServerCommunicator", der die Events für den Host-Client in eine geschützte Queue steckt.
    Insofern sehe ich erstmal keine Gefahr. Sollte das zu Performanceschiwerigkeiten führen (wovon ich erstmal nicht ausgehe), würde ich das TCP-Gedöns in einem eigenen Thread laufen lassen und die Events an streng definierten Stellen in den TCP-Thread rein bzw. rausreichen.

    Oder hab ich da einen Denkfehler?



  • Das stimmt, z.B. hat die TCPConnection Klasse bei mir eine stop() Methode um das shutdown() und close() auf dem Socket aufzurufen.

    pumuckl schrieb:

    ...
    Oder hab ich da einen Denkfehler?

    Ich denke das ist korrekt so (wird bei uns z.T. auch so eingesetzt). Wenn io_service::run() nur von genau einem Thread angetrieben wird, werden auch die Handler nur genau auf diesem Thread ausgeführt.


Anmelden zum Antworten