boost::asio - Asynchroner Aufruf nach beenden des sockets



  • Handler werden grundsätzlich über den Dispatacher vom io_service ausgeführt.

    Soll heißen: Wenn Du einen Handler über irgendeine asynchrone Operation in die Queue schiebst, wird dieser erst ausgeführt, wenn Du run() , run_one() , poll() oder poll_one() für das benutzte io_service -Objekt aufrufst.

    Due Aufrufe von shutdown() und close() müssen synchronisiert werden, weil sie sonst potentiell parallel zur Abarbeitung z.B. eines async_receive passieren können. Wenn gerade ein empfangenes Datagramm bearbeitet wird (vom Socket), und Du gleichzeitig z.B. close() auf den Socket aufrufst, dann hast Du eine klassische Race Condition. Die geteilte Resource ist dabei der Socket selbst und es greifen zwei Threads drauf zu (der vom io_service der gerade das async_receive bearbeitet, und der, der close() aufruft).

    Wieso kannst Du es mit Win-XP nicht testen? -> Achja, cancel() geht da ja nicht.



  • Naja, wegen der mangelnden Unterstützung, die in den Remarks vermekt ist.



  • Tja.. Tachyon.. Ich befürchte, dass es doch nicht so einfach war. 😉

    Ich habe das selbe Fehlerbild, aber jetzt in einer einfacheren Situation bekommen.

    Wenn ich nämlich jetzt gar keine receives mehr empfange und gleich resetten will, dann funktioniert das mit dem posten von shutdown und close nicht. Aus irgendeinem Grund kommt manchmal die Abbruch Meldung trotzdem erst wenn der service wieder gestartet ist (es war also ein Handler nach io_service::stop drin..). Das konnte ich bis jetzt nicht beobachten, wenn ich die calls einfach ohne post gemacht habe.

    // böse:
    //io_service_.post ( boost::bind ( &udp::socket::shutdown, &socket_, udp::socket::shutdown_both ) );
    //io_service_.post ( boost::bind ( &udp::socket::close, &socket_ ) );
    
    // anscheinend gut:
    socket_.shutdown ( udp::socket::shutdown_both );
    socket_.close ();
    
    io_service_.post ( boost::bind ( &boost::asio::io_service::stop, &io_service_) );
    
    run_thread_->join ();
    io_service_.reset ();
    

    Dabei müssten ja die die beiden calls tatsächlich vor dem io_service::stop kommen.. Für mich macht das ganze irgendwie keinen Sinn mehr.. 😕



  • Sollte es nicht so laufen?

    1. shutdown socket (nur receive seite)
    2. receive handler wird aufgerufen mit error eof, bzw. bytes_transferred == 0
    3. close socket im receive handler

    Oder bin ich irgendwie ganz falsch gewickelt.

    Simon


  • Administrator

    Kannst du mal ein minimales Beispiel zeigen, wo der Fehler passiert? Wenn ich Zeit finde, möchte ich das heute mal durchtesten. 😉

    Grüssli

    PS: Es kam schon wieder eine neue Boost Version raus ... 😮



  • @theta:
    Da tachyon noch nichts gesagt hat und in der boost Doku nix davon steht, gehe ich davon aus, dass es schon so geht..

    @dravere:
    Ja, werde ich wahrscheinlich heute Abend noch machen. Im Moment habe ich ein paar andere Sachen im Kopf. 😉



  • drakon schrieb:

    Wenn ich nämlich jetzt gar keine receives mehr empfange und gleich resetten will, dann funktioniert das mit dem posten von shutdown und close nicht.

    Das sollte eigentlich immer funktionieren. Voraussetzung dafür ist allerdings, dass Du einen io_service mit aktivem Eventprozessor hast. Wenn nicht, dann tut post() genau gar nichts.

    drakon schrieb:

    Aus irgendeinem Grund kommt manchmal die Abbruch Meldung trotzdem erst wenn der service wieder gestartet ist (es war also ein Handler nach io_service::stop drin..). Das konnte ich bis jetzt nicht beobachten, wenn ich die calls einfach ohne post gemacht habe.

    Das darf so nicht sein. Ich glaube, Du solltest doch mal deutlich mehr Code zeigen. Vor allem den, wo Du die asynchronen Operationen startest.

    // anscheinend gut:
    socket_.shutdown ( udp::socket::shutdown_both ); 
    socket_.close ();
    //nein, sehr schlecht. sockets sind nicht threadsafe, und Du greifst mit zwei
    //Threads drauf zu (der aus dem Eventprozessor, und der, der close() aufruft-
    

    Ich stelle nochmal die Frage:
    Wieso beendest Du den io_service ? Du kannst auch bei laufendem Eventprozessor die sockets schließen und wieder neu öffnen. Noch besser wäre, wie hier bereits gesagt wurde, das Socketobjekt komplett neu zu erstellen.



  • Das der Zugriff da nicht synchronisiert ist, ist mir klar, allerdings habe ich, wie gesagt den Fehler so nicht reproduzieren können, daher "anscheinend gut". 😉

    Das ich da den io_service neu starte muss wohl nicht sein, ja, aber ich dachte halt zuerst eben, dass es nötig seie, oder zumindest gut ginge. Im Moment brauche ich das vor allem, um sicher zu gehen, dass auch alle Handler durch sind und warte daher bis der Thread zurück kommt und starte ihn neu. Ich denke, dass man das mit io_service::poll auch erreichen kann, aber jetzt will ich zuerst das hier zum laufen kriegen. 😉

    #include <iostream>
    #include <vector>
    #include <string>
    
    #define _WIN32_WINNT 0x0501
    
    #include <boost/asio/io_service.hpp>
    #include <boost/asio/ip/udp.hpp>
    #include <boost/asio/buffer.hpp>
    
    #include <boost/thread.hpp>
    #include <boost/system/error_code.hpp>
    #include <boost/bind.hpp>
    
    using boost::asio::ip::udp;
    
    boost::asio::io_service io_service;
    std::vector<char> buffer ( 64 );
    std::vector<char> send_buffer  ( 64 );
    boost::asio::ip::udp::socket socket_  ( io_service );
    
    boost::asio::ip::udp::endpoint remote_endpoint ;
    std::auto_ptr<boost::thread> run_thread;
    
    void start ();
    void received ( const boost::system::error_code& error, std::size_t bytes_received );
    void cleanup ();
    void restart ( unsigned int size );
    
    void receiving ()
    {
    	socket_.async_receive_from ( boost::asio::buffer ( buffer ), remote_endpoint, &received );
    }
    
    void sent ( const boost::system::error_code& error, std::size_t bytes_sent )
    {
    	receiving ();
    }
    
    void received ( const boost::system::error_code& error, std::size_t bytes_received )
    {
    	if ( !error )
    	{
    		std::string answer = "tag";
    		send_buffer = std::vector<char> ( answer.begin (), answer.end () );
    
    		socket_.async_send_to ( boost::asio::buffer ( send_buffer ), remote_endpoint, &sent );
    	}
    	else
    		std::cout << "error - received " << error.message () << "\n";
    }
    
    void run ( boost::asio::io_service* s )
    {
    	s->run ();
    }
    
    void restart ( unsigned int size )
    {
    	cleanup ();
    
    	// auskommentieren, damit die Meldung ausgegeben wird
    	buffer = std::vector<char> ( size );
    
    	start ();
    }
    
    void start ()
    {
    	socket_.open ( udp::v4 () );
    	socket_.bind ( udp::endpoint ( udp::v4 (), 33333) );
    
    	receiving ();
    
    	run_thread.reset ( new boost::thread ( boost::bind ( &run, &io_service ) ) );	
    }
    
    void cleanup ()
    {
    	// böse
    	io_service.post ( boost::bind ( &udp::socket::shutdown, &socket_, udp::socket::shutdown_both ) );
    	io_service.post ( boost::bind ( &udp::socket::close, &socket_ ) );
    
    	// anscheinend ok
    	//socket_.shutdown ( udp::socket::shutdown_both );
    	//socket_.close ();
    
    	io_service.post ( boost::bind ( &boost::asio::io_service::stop, &io_service) );
    
    	run_thread->join ();
    	io_service.reset ();
    
    	std::cout << "cleanup finished\n";
    }
    
    /*
    	Korrekterweise sollte zuerst "server" ausgegeben werden, dann bei der eingabe von "quit" sollte die error Meldung ausgegeben werden
    	und erst dann "cleanup finished". Allerdings kommt es vor, dass bei den post-Varianten zuerst "cleanup finished" kommt, dann "server"
    	und erst dann kommt die error Meldung, dass eine Verbindung unterbrochen wurde.
    	Respektive es kommt eine Laufzeitmeldung, dass vector nicht dereferenziert werden kann, was daher kommt, dass der buffer in 
    	restart invalidiert wird, was kein Problem ist, wenn die Meldung noch innerhalb von cleanup vorkommt.
    
    */
    int main ()
    {
    	start ();
    
    	for (int i = 0; i < 2; ++i)
    	{
    		std::cout << "server\n";
    
    		for (;;)
    		{
    			std::string c;
    			std::cin >> c;
    			if ( c == "quit" )
    				break;
    		}
    
    		restart ( 64 );
    	}
    }
    

    Und so siehts dann manchmal aus:
    http://dl.dropbox.com/u/1716912/udp_error.jpg


  • Administrator

    Was heisst manchmal? Habe es nun 100 Mal getestet, konnte das Verhalten nicht reproduzieren:
    Win7 64 Bit, MSVC 9.0 (2008), Boost 1.42.0 (ganz frisch geholt).

    Wobei bei 1.42.0 anscheinend ein paar Bugs behoben wurden, welche womöglich mit dem Problem zusammenhängen könnten, bin mir aber nicht ganz sicher:
    http://www.boost.org/doc/libs/1_42_0/doc/html/boost_asio/history.html

    Welche Version verwendest du denn? Dann kann ich es allenfalls noch mit der probieren, müsste ich allerdings auch zuerst noch kompilieren.

    Die einzige Sache, wo ich nicht so genau weiss, wie das eigentlich aussieht und mir hier auffällt: Wie sieht die Synchronisation von std::cout über mehrere Threads aus? Könnte es ein Problem sein, dass da eine Nachricht im Puffer verbleibt und dann eine andere Nachricht zuerst auftaucht, obwohl sie erst später geschrieben wurde? Kenne mich da zu wenig aus. Vielleicht noch bei jeder Ausgabe std::endl verwenden und ein std::clock Stempel dazu 😉

    Grüssli



  • Also ich kann das sehr gut nachvollziehen, wenn ich einfach sobald das Fenster aufgeht "quit" mache. Dann brauche ich da 2-3x zu versuchen und ich habe einen Fehler.

    Ich habe die 1.41 Version.
    Kann sein, dass was behoben wurde, habe was gelesen in der changelog:

    Fixed a problem with the lifetime of handler memory, where Windows needs the OVERLAPPED structure to be valid until both the initiating function call has returned and the completion packet has been delivered.

    Das Problem bei mir hat ja was mit dem memory zu tun, allerdings wird bei mir ja nicht der Speicher direkt falsch behandelt, sondern eben nur der Handler kommt zu spät.

    Und nein. An cout liegst nicht, wenn ich kriege den Fehler ja aufgrund von einem Speicherzugriffsfehler in vector. Die Ausgabe habe ich nur da, damit man es besser sehen kann.



  • Der Code hat einiger Fehler.

    Nur, um mal eine potentielle Quelle zu nennen:

    void restart ( unsigned int size )
    {
        //hier drin werden Dinge über post() asynchron gestartet
        cleanup ();
    
        //das hier läuft aber in einem anderen Thread
        buffer = std::vector<char> ( size );
    
        start ();
    }
    

    Probier es mal so:

    #include <iostream>
    #include <vector>
    #include <string>
    
    #define _WIN32_WINNT 0x0501
    
    #include <boost/asio/io_service.hpp>
    #include <boost/asio/ip/udp.hpp>
    #include <boost/asio/buffer.hpp>
    
    #include <boost/thread.hpp>
    #include <boost/system/error_code.hpp>
    #include <boost/bind.hpp>
    
    using boost::asio::ip::udp;
    
    boost::asio::io_service io_service;
    //so bleibt der Eventprozessor aktiv, auch wenn nichts zu tun ist
    boost::asio::io_service::work worker(io_service);
    
    std::size_t bufferSize = 64;
    std::vector<char> buffer ( bufferSize );
    std::vector<char> send_buffer  ( bufferSize );
    
    boost::asio::ip::udp::socket socket_(io_service);
    
    boost::asio::ip::udp::endpoint remote_endpoint ;
    //muss nicht geheapt werden
    boost::thread run_thread;
    
    void initSocket();
    void closeSocket();
    void startServer();
    void nextCycle();
    void handleReceive(boost::system::error_code const & error, std::size_t bytesReceived);
    void handleSend(boost::system::error_code const & error, std::size_t bytesReceived);
    void handleError(boost::system::error_code const & error);
    void restart(std::size_t size);
    
    //socket initialisieren
    void initSocket()
    {
        socket_.open (udp::v4());
        socket_.bind (udp::endpoint(udp::v4(), 33333));
    }
    
    //socket schliessen
    void closeSocket()
    {
        socket_.shutdown(udp::socket::shutdown_both);
        socket_.close();
    }
    
    //asynchrones Zeugs anwerfen
    void startServer()
    {
        initSocket();
        nextCycle();
    }
    
    //ein Zyklus besteht aus receive->send->...
    //nextCycle() leitet so einen Zyklus ein
    void nextCycle()
    {
        socket_.async_receive_from
            ( boost::asio::buffer(buffer)
            , remote_endpoint
            , &handleReceive ); 
    }
    
    void handleReceive(boost::system::error_code const & error, std::size_t bytesReceived)
    {
        if(error)
        {
            //bei einem Fehler wollen wir was tun!
            handleError(error);
        }
        else
        {
            char const answer[] = "tag";
            send_buffer.assign(answer, answer + sizeof(answer));
            socket_.async_send_to
                ( boost::asio::buffer(send_buffer)
                , remote_endpoint
                , &handleSend ); 
        }
    }
    
    void handleSend(boost::system::error_code const & error, std::size_t bytesReceived)
    {
        if(error)
        {
            //bei einem Fehler wollen wir was tun!
            handleError(error);
        }
        else
        {
            nextCycle();
        }
    }
    
    void handleError(boost::system::error_code const & error)
    {
        //hier ist einer der Handler sicher mit einem Fehler
        //zurueckgekommen
        std::cerr << "Error: " << error.message() << '\n';
        //wenn der Fehlercode "abbruch" sagt (close()):
        if(error == boost::asio::error::operation_aborted)
        {
            //Buffer setzen
            buffer.assign(bufferSize, 0);
            //Socket neu oeffnen
            initSocket();
            //neuen Zyklus einleiten
            nextCycle();
        }
    }
    
    void restart(std::size_t size)
    {
        bufferSize = size;
        //Socket schliessen
        socket_.get_io_service().post(&closeSocket);
    }
    
    int main ()
    {
        startServer();
        run_thread.swap(boost::thread(boost::bind(&boost::asio::io_service::run, &io_service)));
    
        for (int i = 0; i < 4; ++i)
        {
            std::cout << "server\n";
    
            for (;;)
            {
                std::string c;
                std::cin >> c;
                if ( c == "quit" )
                    break;
            }
            restart ( 64 );
        }
    }
    


  • Hmm. Danke für das Beispiel.
    Werds mir bestimmt genauer anschauen.

    Aber wo meinst du sind sonst noch Fehler?

    Und irgendwie will ich schon noch wissen, was an obigem Code explizit den Fehler verursacht. Wie du ja sagts müssten die Handler, die per post aufgerufen werden der Reihe nach korrekt aufgerufen werden, was ja anscheinend nicht wirklich passiert.

    Potentiell glaube ich dir schon, dass es Fehler drin hat, aber was ruft mir denn jetzt den Fehler hervor? (konntest du ihn überhaupt auch mal nach vollziehen?)


  • Administrator

    Tachyon schrieb:

    Der Code hat einiger Fehler.

    Nur, um mal eine potentielle Quelle zu nennen:

    void restart ( unsigned int size )
    {
        //hier drin werden Dinge über post() asynchron gestartet
        cleanup ();
       
        //das hier läuft aber in einem anderen Thread
        buffer = std::vector<char> ( size );
    
        start ();
    }
    

    Kann es sein, dass du in cleanup das run_thread->join() übersehen hast? Wenn die Kopie durchgeführt wird, sollte eigentlich nur noch ein Thread laufen 😉

    @drakon,
    Ich kompiliere mal Boost 1.41.0 ... das dauert aber ein wenig 😉

    Grüssli



  • Dravere schrieb:

    @drakon,
    Ich kompiliere mal Boost 1.41.0 ... das dauert aber ein wenig 😉

    Grüssli

    Lad dir doch die Binary?


  • Administrator

    drakon schrieb:

    Lad dir doch die Binary?

    Und mich dort registrieren? Keine Lust! Über bjam kann man die Sache ja einfach kompilieren, man muss dann halt nur ein wenig warten, aber was solls? Meine beiden Kerne werden zusammen nie wirklich über 70% belastet, kann man noch wunderbar daneben arbeiten.

    Jedenfalls habe ich es nun mit Boost 1.41.0 getestet. Irgendwas machst du verkehrt, ich kann dir nur nicht sagen was, da es bei mir wunderbar funktioniert 😃
    Ich starte gerade noch meinen Testcomputer, dort habe ich noch ein Win7 x86 und ein WinXP x86 drauf. Teste mal die Sache noch dort drüben, ob es dort irgendeinen Fehler gibt 😉

    Grüssli


  • Administrator

    drakon schrieb:

    Und nein. An cout liegst nicht, wenn ich kriege den Fehler ja aufgrund von einem Speicherzugriffsfehler in vector. Die Ausgabe habe ich nur da, damit man es besser sehen kann.

    Kannst du mir noch was genaueres zu diesem Speicherzugriffsfehler sagen? Wo passiert der genau?
    Der Fehler mit der Ausgabe auf der Konsole liegt meiner Meinung nach an der Konsole selber, also dieser hier:
    http://dl.dropbox.com/u/1716912/udp_error.jpg
    Wenn du bei der Ausgabe der Nachricht "server" mehrere Ausgaben machst (z.B. std::cout << std::clock() << ' ' << "server"; ), erkennt man sehr schön, wie sich zum Teil die server & error Nachrichten überlagern. Zumindest auf WinXP ist dies immer mal wieder passiert, mit Win7 konnte ich nicht mal das reproduzieren. Daher könnte ich mir gut vorstellen, dass es nur ein Problem der XP Konsole ist. Müsste man mal genauer nachforschen gehen, wie die bei multithread Anwendungen reagiert.

    Den Speicherzugrifffehler konnte ich allerdings mit dem gezeigten Code nie reproduzieren, also trat bei mir nie auf.

    Grüssli



  • trace:

    msvcp90d.dll!104ed4c3() 	
     	[Unten angegebene Rahmen sind möglicherweise nicht korrekt und/oder fehlen, keine Symbole geladen für msvcp90d.dll]	
    >	udp_error.exe!std::_Vector_const_iterator<char,std::allocator<char> >::operator*()  Zeile 98 + 0x14 Bytes	C++
     	udp_error.exe!std::_Vector_iterator<char,std::allocator<char> >::operator*()  Zeile 340	C++
     	udp_error.exe!boost::asio::detail::buffer_debug_check<std::_Vector_iterator<char,std::allocator<char> > >::operator()()  Zeile 446	C++
     	udp_error.exe!boost::detail::function::void_function_obj_invoker0<boost::asio::detail::buffer_debug_check<std::_Vector_iterator<char,std::allocator<char> > >,void>::invoke(boost::detail::function::function_buffer & function_obj_ptr={...})  Zeile 154	C++
     	udp_error.exe!boost::function0<void>::operator()()  Zeile 1013 + 0x16 Bytes	C++
     	udp_error.exe!boost::asio::detail::buffer_cast_helper(const boost::asio::mutable_buffer & b={...})  Zeile 124	C++
     	udp_error.exe!boost::asio::buffer_cast<char *>(const boost::asio::mutable_buffer & b={...})  Zeile 141 + 0x9 Bytes	C++
     	udp_error.exe!boost::asio::detail::win_iocp_socket_service<boost::asio::ip::udp>::receive_from_operation<boost::asio::mutable_buffers_1,void (__cdecl*)(boost::system::error_code const &,unsigned int)>::do_completion_impl(boost::asio::detail::win_iocp_io_service::operation * op=0x00346f48, unsigned long last_error=995, unsigned int bytes_transferred=0)  Zeile 1643 + 0x9 Bytes	C++
     	udp_error.exe!boost::asio::detail::win_iocp_io_service::operation::do_completion(unsigned long last_error=995, unsigned int bytes_transferred=0)  Zeile 77 + 0x16 Bytes	C++
     	udp_error.exe!boost::asio::detail::win_iocp_io_service::do_one(bool block=true, boost::system::error_code & ec={...})  Zeile 512	C++
     	udp_error.exe!boost::asio::detail::win_iocp_io_service::run(boost::system::error_code & ec={...})  Zeile 186 + 0xe Bytes	C++
     	udp_error.exe!boost::asio::io_service::run()  Zeile 58 + 0xf Bytes	C++
     	udp_error.exe!run(boost::asio::io_service * s=0x00467638)  Zeile 56	C++
     	udp_error.exe!boost::_bi::list1<boost::_bi::value<boost::asio::io_service *> >::operator()<void (__cdecl*)(boost::asio::io_service *),boost::_bi::list0>(boost::_bi::type<void> __formal={...}, void (boost::asio::io_service *)* & f=0x0041c9ab, boost::_bi::list0 & a={...}, boost::_bi::type<void> __formal={...})  Zeile 246 + 0x23 Bytes	C++
     	udp_error.exe!boost::_bi::bind_t<void,void (__cdecl*)(boost::asio::io_service *),boost::_bi::list1<boost::_bi::value<boost::asio::io_service *> > >::operator()()  Zeile 21	C++
     	udp_error.exe!boost::detail::thread_data<boost::_bi::bind_t<void,void (__cdecl*)(boost::asio::io_service *),boost::_bi::list1<boost::_bi::value<boost::asio::io_service *> > > >::run()  Zeile 57	C++
     	udp_error.exe!boost::`anonymous namespace'::thread_start_function(void * param=0x0015cbc8)  Zeile 168	C++
     	msvcr90d.dll!1023dfd3() 	
     	msvcr90d.dll!1023df69() 	
     	kernel32.dll!7c80b729()
    

    Also ich kriege den Zugriffsfehler auch wenn ich keine Ausgabe habe. (Ursprünglich war da ja keine, sondern die habe ich für das nachvollziehen eingebaut).



  • Bei mir funktioniert drakons Code so wie gepostet nicht. Ich bekomme Assertions aufgrund ungültiger Vektor-Iteratoren, wenn ich quit eingebe.
    Mit dem join() hast Du allerdings recht. Ich werde mir das bei Gelegenheit nochmal genauer ansehen.



  • Tachyon schrieb:

    Bei mir funktioniert drakons Code so wie gepostet nicht. Ich bekomme Assertions aufgrund ungültiger Vektor-Iteratoren, wenn ich quit eingebe.
    Mit dem join() hast Du allerdings recht. Ich werde mir das bei Gelegenheit nochmal genauer ansehen.

    Genau die bekomme ich eben auch.

    Danke für die Mühe. 🙂


  • Administrator

    Ich habe mich gerade gefragt, ob es womöglich an Boost.Thread liegen könnte, dass da der join nicht richtig funktioniert. Konnte allerdings keine entsprechende Themen dazu in der Mailing Liste von Boost.User finden. Ich schau mir das auch noch etwas genauer an, komme bei meinem eigenen Problem grad auch nicht weiter, daher lenke ich mich jetzt mal damit ab 😉

    Grüssli


Anmelden zum Antworten