boost::asio - Asynchroner Aufruf nach beenden des sockets
-
drakon schrieb:
Dann ist demfall die Doku einfach nicht zu gebrauchen. Denn hier steht rein gar nichts von einem io_service, also gehe ich davon aus, dass das gemacht wird, was da steht..
Zugegeben: Die Doku ist manchmal etwas arg dünn.
socket::shutdown()
sorgt allerdings erstmal nur dafür, dass man Senden, Empfangen oder beides für einen Socket ausknipsen kann. Dadurch kommt es nicht zu dem "Fehler".Der Fehler ist auch kein Fehler, sondern das korrekte Verhalten von
socket::close()
:basic_datagram_socket::close schrieb:
This function is used to close the socket. Any asynchronous send, receive or connect operations will be cancelled immediately, and will complete with the boost::asio::error::operation_aborted error.
Der E/A-Vorgang wurde wegen eines Threadendes oder einer Anwendungsanforderung abgebrochen ist die Win-API Meldung zu
opearation_aborted
.drakon schrieb:
Ok, das macht Sinn. Das konnte ich leider aus der Doku nicht lesen.. Woher hast du denn das?
Die Sache mit dem
io_service::post()
habe ich aus der Doku. In den Beispielen ist das ein häufig verwendetes Pattern. Außer io_service ist fast keine asio-Klasse threadsafe. Deshalb müssen potentiell konkurrierende Aufrufe von Memberfunktionen auf Objekte dieser Klassen synchronisiert werden. Guck Dir z.B. mal die Close-Funktion aus dem Chat-Client Beispiel an.Das mit der Queue und der Arbeitsweise der asynchronen Operationen habe ich aus der boost.asio-Übersicht. Da wo das Proactor-Pattern beschrieben wird.
-
Naja. Ich habe das dort eben so verstanden, dass innerhalb von close die Funktionen mit dem error aufgerufen werden sollten. Das verstehe ich unter immediately. Da steht ja nichts davon, dass die Aufrufe in die queue eingereiht werden..
Etwas ist mir aber nicht ganz klar. Warum synchronisiert du die Aufrufe von socket::shutdown und socket::close? - Es reicht ja dafür zu sorgen, dass io_service::stop erst am Ende ausgeführt wird. Habs mal getestet und funktionieren tuts auch.
Und sehe ich das richtig, dass socket::cancel hier wirklich sofort die Handler beendet und somit eher das Mittel der Wahl wäre? (ich habe XP, darum kann ichs nicht testen).
Ja. Die Doku ist mir wirklich zu mager. Aber abgesehen von der boost Seite und den Beispielen hast du auch nichts anderes mehr gefunden? (Ich habe auch mal ein wenig gesucht und hatte einfach das Gefühl, dass asio kaum wer braucht. ;))
-
Handler werden grundsätzlich über den Dispatacher vom
io_service
ausgeführt.Soll heißen: Wenn Du einen Handler über irgendeine asynchrone Operation in die Queue schiebst, wird dieser erst ausgeführt, wenn Du
run()
,run_one()
,poll()
oderpoll_one()
für das benutzteio_service
-Objekt aufrufst.Due Aufrufe von
shutdown()
undclose()
müssen synchronisiert werden, weil sie sonst potentiell parallel zur Abarbeitung z.B. einesasync_receive
passieren können. Wenn gerade ein empfangenes Datagramm bearbeitet wird (vom Socket), und Du gleichzeitig z.B.close()
auf den Socket aufrufst, dann hast Du eine klassische Race Condition. Die geteilte Resource ist dabei der Socket selbst und es greifen zwei Threads drauf zu (der vomio_service
der gerade das async_receive bearbeitet, und der, derclose()
aufruft).Wieso kannst Du es mit Win-XP nicht testen? -> Achja, cancel() geht da ja nicht.
-
Naja, wegen der mangelnden Unterstützung, die in den Remarks vermekt ist.
-
Tja.. Tachyon.. Ich befürchte, dass es doch nicht so einfach war.
Ich habe das selbe Fehlerbild, aber jetzt in einer einfacheren Situation bekommen.
Wenn ich nämlich jetzt gar keine receives mehr empfange und gleich resetten will, dann funktioniert das mit dem posten von shutdown und close nicht. Aus irgendeinem Grund kommt manchmal die Abbruch Meldung trotzdem erst wenn der service wieder gestartet ist (es war also ein Handler nach io_service::stop drin..). Das konnte ich bis jetzt nicht beobachten, wenn ich die calls einfach ohne post gemacht habe.
// böse: //io_service_.post ( boost::bind ( &udp::socket::shutdown, &socket_, udp::socket::shutdown_both ) ); //io_service_.post ( boost::bind ( &udp::socket::close, &socket_ ) ); // anscheinend gut: socket_.shutdown ( udp::socket::shutdown_both ); socket_.close (); io_service_.post ( boost::bind ( &boost::asio::io_service::stop, &io_service_) ); run_thread_->join (); io_service_.reset ();
Dabei müssten ja die die beiden calls tatsächlich vor dem io_service::stop kommen.. Für mich macht das ganze irgendwie keinen Sinn mehr..
-
Sollte es nicht so laufen?
1. shutdown socket (nur receive seite)
2. receive handler wird aufgerufen mit error eof, bzw. bytes_transferred == 0
3. close socket im receive handlerOder bin ich irgendwie ganz falsch gewickelt.
Simon
-
Kannst du mal ein minimales Beispiel zeigen, wo der Fehler passiert? Wenn ich Zeit finde, möchte ich das heute mal durchtesten.
Grüssli
PS: Es kam schon wieder eine neue Boost Version raus ...
-
@theta:
Da tachyon noch nichts gesagt hat und in der boost Doku nix davon steht, gehe ich davon aus, dass es schon so geht..@dravere:
Ja, werde ich wahrscheinlich heute Abend noch machen. Im Moment habe ich ein paar andere Sachen im Kopf.
-
drakon schrieb:
Wenn ich nämlich jetzt gar keine receives mehr empfange und gleich resetten will, dann funktioniert das mit dem posten von shutdown und close nicht.
Das sollte eigentlich immer funktionieren. Voraussetzung dafür ist allerdings, dass Du einen
io_service
mit aktivem Eventprozessor hast. Wenn nicht, dann tutpost()
genau gar nichts.drakon schrieb:
Aus irgendeinem Grund kommt manchmal die Abbruch Meldung trotzdem erst wenn der service wieder gestartet ist (es war also ein Handler nach io_service::stop drin..). Das konnte ich bis jetzt nicht beobachten, wenn ich die calls einfach ohne post gemacht habe.
Das darf so nicht sein. Ich glaube, Du solltest doch mal deutlich mehr Code zeigen. Vor allem den, wo Du die asynchronen Operationen startest.
// anscheinend gut: socket_.shutdown ( udp::socket::shutdown_both ); socket_.close (); //nein, sehr schlecht. sockets sind nicht threadsafe, und Du greifst mit zwei //Threads drauf zu (der aus dem Eventprozessor, und der, der close() aufruft-
Ich stelle nochmal die Frage:
Wieso beendest Du denio_service
? Du kannst auch bei laufendem Eventprozessor die sockets schließen und wieder neu öffnen. Noch besser wäre, wie hier bereits gesagt wurde, das Socketobjekt komplett neu zu erstellen.
-
Das der Zugriff da nicht synchronisiert ist, ist mir klar, allerdings habe ich, wie gesagt den Fehler so nicht reproduzieren können, daher "anscheinend gut".
Das ich da den io_service neu starte muss wohl nicht sein, ja, aber ich dachte halt zuerst eben, dass es nötig seie, oder zumindest gut ginge. Im Moment brauche ich das vor allem, um sicher zu gehen, dass auch alle Handler durch sind und warte daher bis der Thread zurück kommt und starte ihn neu. Ich denke, dass man das mit io_service::poll auch erreichen kann, aber jetzt will ich zuerst das hier zum laufen kriegen.
#include <iostream> #include <vector> #include <string> #define _WIN32_WINNT 0x0501 #include <boost/asio/io_service.hpp> #include <boost/asio/ip/udp.hpp> #include <boost/asio/buffer.hpp> #include <boost/thread.hpp> #include <boost/system/error_code.hpp> #include <boost/bind.hpp> using boost::asio::ip::udp; boost::asio::io_service io_service; std::vector<char> buffer ( 64 ); std::vector<char> send_buffer ( 64 ); boost::asio::ip::udp::socket socket_ ( io_service ); boost::asio::ip::udp::endpoint remote_endpoint ; std::auto_ptr<boost::thread> run_thread; void start (); void received ( const boost::system::error_code& error, std::size_t bytes_received ); void cleanup (); void restart ( unsigned int size ); void receiving () { socket_.async_receive_from ( boost::asio::buffer ( buffer ), remote_endpoint, &received ); } void sent ( const boost::system::error_code& error, std::size_t bytes_sent ) { receiving (); } void received ( const boost::system::error_code& error, std::size_t bytes_received ) { if ( !error ) { std::string answer = "tag"; send_buffer = std::vector<char> ( answer.begin (), answer.end () ); socket_.async_send_to ( boost::asio::buffer ( send_buffer ), remote_endpoint, &sent ); } else std::cout << "error - received " << error.message () << "\n"; } void run ( boost::asio::io_service* s ) { s->run (); } void restart ( unsigned int size ) { cleanup (); // auskommentieren, damit die Meldung ausgegeben wird buffer = std::vector<char> ( size ); start (); } void start () { socket_.open ( udp::v4 () ); socket_.bind ( udp::endpoint ( udp::v4 (), 33333) ); receiving (); run_thread.reset ( new boost::thread ( boost::bind ( &run, &io_service ) ) ); } void cleanup () { // böse io_service.post ( boost::bind ( &udp::socket::shutdown, &socket_, udp::socket::shutdown_both ) ); io_service.post ( boost::bind ( &udp::socket::close, &socket_ ) ); // anscheinend ok //socket_.shutdown ( udp::socket::shutdown_both ); //socket_.close (); io_service.post ( boost::bind ( &boost::asio::io_service::stop, &io_service) ); run_thread->join (); io_service.reset (); std::cout << "cleanup finished\n"; } /* Korrekterweise sollte zuerst "server" ausgegeben werden, dann bei der eingabe von "quit" sollte die error Meldung ausgegeben werden und erst dann "cleanup finished". Allerdings kommt es vor, dass bei den post-Varianten zuerst "cleanup finished" kommt, dann "server" und erst dann kommt die error Meldung, dass eine Verbindung unterbrochen wurde. Respektive es kommt eine Laufzeitmeldung, dass vector nicht dereferenziert werden kann, was daher kommt, dass der buffer in restart invalidiert wird, was kein Problem ist, wenn die Meldung noch innerhalb von cleanup vorkommt. */ int main () { start (); for (int i = 0; i < 2; ++i) { std::cout << "server\n"; for (;;) { std::string c; std::cin >> c; if ( c == "quit" ) break; } restart ( 64 ); } }
Und so siehts dann manchmal aus:
http://dl.dropbox.com/u/1716912/udp_error.jpg
-
Was heisst manchmal? Habe es nun 100 Mal getestet, konnte das Verhalten nicht reproduzieren:
Win7 64 Bit, MSVC 9.0 (2008), Boost 1.42.0 (ganz frisch geholt).Wobei bei 1.42.0 anscheinend ein paar Bugs behoben wurden, welche womöglich mit dem Problem zusammenhängen könnten, bin mir aber nicht ganz sicher:
http://www.boost.org/doc/libs/1_42_0/doc/html/boost_asio/history.htmlWelche Version verwendest du denn? Dann kann ich es allenfalls noch mit der probieren, müsste ich allerdings auch zuerst noch kompilieren.
Die einzige Sache, wo ich nicht so genau weiss, wie das eigentlich aussieht und mir hier auffällt: Wie sieht die Synchronisation von
std::cout
über mehrere Threads aus? Könnte es ein Problem sein, dass da eine Nachricht im Puffer verbleibt und dann eine andere Nachricht zuerst auftaucht, obwohl sie erst später geschrieben wurde? Kenne mich da zu wenig aus. Vielleicht noch bei jeder Ausgabestd::endl
verwenden und einstd::clock
Stempel dazuGrüssli
-
Also ich kann das sehr gut nachvollziehen, wenn ich einfach sobald das Fenster aufgeht "quit" mache. Dann brauche ich da 2-3x zu versuchen und ich habe einen Fehler.
Ich habe die 1.41 Version.
Kann sein, dass was behoben wurde, habe was gelesen in der changelog:Fixed a problem with the lifetime of handler memory, where Windows needs the OVERLAPPED structure to be valid until both the initiating function call has returned and the completion packet has been delivered.
Das Problem bei mir hat ja was mit dem memory zu tun, allerdings wird bei mir ja nicht der Speicher direkt falsch behandelt, sondern eben nur der Handler kommt zu spät.
Und nein. An
cout
liegst nicht, wenn ich kriege den Fehler ja aufgrund von einem Speicherzugriffsfehler in vector. Die Ausgabe habe ich nur da, damit man es besser sehen kann.
-
Der Code hat einiger Fehler.
Nur, um mal eine potentielle Quelle zu nennen:
void restart ( unsigned int size ) { //hier drin werden Dinge über post() asynchron gestartet cleanup (); //das hier läuft aber in einem anderen Thread buffer = std::vector<char> ( size ); start (); }
Probier es mal so:
#include <iostream> #include <vector> #include <string> #define _WIN32_WINNT 0x0501 #include <boost/asio/io_service.hpp> #include <boost/asio/ip/udp.hpp> #include <boost/asio/buffer.hpp> #include <boost/thread.hpp> #include <boost/system/error_code.hpp> #include <boost/bind.hpp> using boost::asio::ip::udp; boost::asio::io_service io_service; //so bleibt der Eventprozessor aktiv, auch wenn nichts zu tun ist boost::asio::io_service::work worker(io_service); std::size_t bufferSize = 64; std::vector<char> buffer ( bufferSize ); std::vector<char> send_buffer ( bufferSize ); boost::asio::ip::udp::socket socket_(io_service); boost::asio::ip::udp::endpoint remote_endpoint ; //muss nicht geheapt werden boost::thread run_thread; void initSocket(); void closeSocket(); void startServer(); void nextCycle(); void handleReceive(boost::system::error_code const & error, std::size_t bytesReceived); void handleSend(boost::system::error_code const & error, std::size_t bytesReceived); void handleError(boost::system::error_code const & error); void restart(std::size_t size); //socket initialisieren void initSocket() { socket_.open (udp::v4()); socket_.bind (udp::endpoint(udp::v4(), 33333)); } //socket schliessen void closeSocket() { socket_.shutdown(udp::socket::shutdown_both); socket_.close(); } //asynchrones Zeugs anwerfen void startServer() { initSocket(); nextCycle(); } //ein Zyklus besteht aus receive->send->... //nextCycle() leitet so einen Zyklus ein void nextCycle() { socket_.async_receive_from ( boost::asio::buffer(buffer) , remote_endpoint , &handleReceive ); } void handleReceive(boost::system::error_code const & error, std::size_t bytesReceived) { if(error) { //bei einem Fehler wollen wir was tun! handleError(error); } else { char const answer[] = "tag"; send_buffer.assign(answer, answer + sizeof(answer)); socket_.async_send_to ( boost::asio::buffer(send_buffer) , remote_endpoint , &handleSend ); } } void handleSend(boost::system::error_code const & error, std::size_t bytesReceived) { if(error) { //bei einem Fehler wollen wir was tun! handleError(error); } else { nextCycle(); } } void handleError(boost::system::error_code const & error) { //hier ist einer der Handler sicher mit einem Fehler //zurueckgekommen std::cerr << "Error: " << error.message() << '\n'; //wenn der Fehlercode "abbruch" sagt (close()): if(error == boost::asio::error::operation_aborted) { //Buffer setzen buffer.assign(bufferSize, 0); //Socket neu oeffnen initSocket(); //neuen Zyklus einleiten nextCycle(); } } void restart(std::size_t size) { bufferSize = size; //Socket schliessen socket_.get_io_service().post(&closeSocket); } int main () { startServer(); run_thread.swap(boost::thread(boost::bind(&boost::asio::io_service::run, &io_service))); for (int i = 0; i < 4; ++i) { std::cout << "server\n"; for (;;) { std::string c; std::cin >> c; if ( c == "quit" ) break; } restart ( 64 ); } }
-
Hmm. Danke für das Beispiel.
Werds mir bestimmt genauer anschauen.Aber wo meinst du sind sonst noch Fehler?
Und irgendwie will ich schon noch wissen, was an obigem Code explizit den Fehler verursacht. Wie du ja sagts müssten die Handler, die per post aufgerufen werden der Reihe nach korrekt aufgerufen werden, was ja anscheinend nicht wirklich passiert.
Potentiell glaube ich dir schon, dass es Fehler drin hat, aber was ruft mir denn jetzt den Fehler hervor? (konntest du ihn überhaupt auch mal nach vollziehen?)
-
Tachyon schrieb:
Der Code hat einiger Fehler.
Nur, um mal eine potentielle Quelle zu nennen:
void restart ( unsigned int size ) { //hier drin werden Dinge über post() asynchron gestartet cleanup (); //das hier läuft aber in einem anderen Thread buffer = std::vector<char> ( size ); start (); }
Kann es sein, dass du in
cleanup
dasrun_thread->join()
übersehen hast? Wenn die Kopie durchgeführt wird, sollte eigentlich nur noch ein Thread laufen@drakon,
Ich kompiliere mal Boost 1.41.0 ... das dauert aber ein wenigGrüssli
-
Dravere schrieb:
@drakon,
Ich kompiliere mal Boost 1.41.0 ... das dauert aber ein wenigGrüssli
Lad dir doch die Binary?
-
drakon schrieb:
Lad dir doch die Binary?
Und mich dort registrieren? Keine Lust! Über bjam kann man die Sache ja einfach kompilieren, man muss dann halt nur ein wenig warten, aber was solls? Meine beiden Kerne werden zusammen nie wirklich über 70% belastet, kann man noch wunderbar daneben arbeiten.
Jedenfalls habe ich es nun mit Boost 1.41.0 getestet. Irgendwas machst du verkehrt, ich kann dir nur nicht sagen was, da es bei mir wunderbar funktioniert
Ich starte gerade noch meinen Testcomputer, dort habe ich noch ein Win7 x86 und ein WinXP x86 drauf. Teste mal die Sache noch dort drüben, ob es dort irgendeinen Fehler gibtGrüssli
-
drakon schrieb:
Und nein. An
cout
liegst nicht, wenn ich kriege den Fehler ja aufgrund von einem Speicherzugriffsfehler in vector. Die Ausgabe habe ich nur da, damit man es besser sehen kann.Kannst du mir noch was genaueres zu diesem Speicherzugriffsfehler sagen? Wo passiert der genau?
Der Fehler mit der Ausgabe auf der Konsole liegt meiner Meinung nach an der Konsole selber, also dieser hier:
http://dl.dropbox.com/u/1716912/udp_error.jpg
Wenn du bei der Ausgabe der Nachricht"server"
mehrere Ausgaben machst (z.B.std::cout << std::clock() << ' ' << "server";
), erkennt man sehr schön, wie sich zum Teil dieserver
&error
Nachrichten überlagern. Zumindest auf WinXP ist dies immer mal wieder passiert, mit Win7 konnte ich nicht mal das reproduzieren. Daher könnte ich mir gut vorstellen, dass es nur ein Problem der XP Konsole ist. Müsste man mal genauer nachforschen gehen, wie die bei multithread Anwendungen reagiert.Den Speicherzugrifffehler konnte ich allerdings mit dem gezeigten Code nie reproduzieren, also trat bei mir nie auf.
Grüssli
-
trace:
msvcp90d.dll!104ed4c3() [Unten angegebene Rahmen sind möglicherweise nicht korrekt und/oder fehlen, keine Symbole geladen für msvcp90d.dll] > udp_error.exe!std::_Vector_const_iterator<char,std::allocator<char> >::operator*() Zeile 98 + 0x14 Bytes C++ udp_error.exe!std::_Vector_iterator<char,std::allocator<char> >::operator*() Zeile 340 C++ udp_error.exe!boost::asio::detail::buffer_debug_check<std::_Vector_iterator<char,std::allocator<char> > >::operator()() Zeile 446 C++ udp_error.exe!boost::detail::function::void_function_obj_invoker0<boost::asio::detail::buffer_debug_check<std::_Vector_iterator<char,std::allocator<char> > >,void>::invoke(boost::detail::function::function_buffer & function_obj_ptr={...}) Zeile 154 C++ udp_error.exe!boost::function0<void>::operator()() Zeile 1013 + 0x16 Bytes C++ udp_error.exe!boost::asio::detail::buffer_cast_helper(const boost::asio::mutable_buffer & b={...}) Zeile 124 C++ udp_error.exe!boost::asio::buffer_cast<char *>(const boost::asio::mutable_buffer & b={...}) Zeile 141 + 0x9 Bytes C++ udp_error.exe!boost::asio::detail::win_iocp_socket_service<boost::asio::ip::udp>::receive_from_operation<boost::asio::mutable_buffers_1,void (__cdecl*)(boost::system::error_code const &,unsigned int)>::do_completion_impl(boost::asio::detail::win_iocp_io_service::operation * op=0x00346f48, unsigned long last_error=995, unsigned int bytes_transferred=0) Zeile 1643 + 0x9 Bytes C++ udp_error.exe!boost::asio::detail::win_iocp_io_service::operation::do_completion(unsigned long last_error=995, unsigned int bytes_transferred=0) Zeile 77 + 0x16 Bytes C++ udp_error.exe!boost::asio::detail::win_iocp_io_service::do_one(bool block=true, boost::system::error_code & ec={...}) Zeile 512 C++ udp_error.exe!boost::asio::detail::win_iocp_io_service::run(boost::system::error_code & ec={...}) Zeile 186 + 0xe Bytes C++ udp_error.exe!boost::asio::io_service::run() Zeile 58 + 0xf Bytes C++ udp_error.exe!run(boost::asio::io_service * s=0x00467638) Zeile 56 C++ udp_error.exe!boost::_bi::list1<boost::_bi::value<boost::asio::io_service *> >::operator()<void (__cdecl*)(boost::asio::io_service *),boost::_bi::list0>(boost::_bi::type<void> __formal={...}, void (boost::asio::io_service *)* & f=0x0041c9ab, boost::_bi::list0 & a={...}, boost::_bi::type<void> __formal={...}) Zeile 246 + 0x23 Bytes C++ udp_error.exe!boost::_bi::bind_t<void,void (__cdecl*)(boost::asio::io_service *),boost::_bi::list1<boost::_bi::value<boost::asio::io_service *> > >::operator()() Zeile 21 C++ udp_error.exe!boost::detail::thread_data<boost::_bi::bind_t<void,void (__cdecl*)(boost::asio::io_service *),boost::_bi::list1<boost::_bi::value<boost::asio::io_service *> > > >::run() Zeile 57 C++ udp_error.exe!boost::`anonymous namespace'::thread_start_function(void * param=0x0015cbc8) Zeile 168 C++ msvcr90d.dll!1023dfd3() msvcr90d.dll!1023df69() kernel32.dll!7c80b729()
Also ich kriege den Zugriffsfehler auch wenn ich keine Ausgabe habe. (Ursprünglich war da ja keine, sondern die habe ich für das nachvollziehen eingebaut).
-
Bei mir funktioniert drakons Code so wie gepostet nicht. Ich bekomme Assertions aufgrund ungültiger Vektor-Iteratoren, wenn ich quit eingebe.
Mit demjoin()
hast Du allerdings recht. Ich werde mir das bei Gelegenheit nochmal genauer ansehen.