Boost::Asio TCP -> Sockets bleiben offen



  • Hallo zusammen,

    Wir wollen eine Client-Server-Anwendung mittels Boost::Asio Bibliothek unter Windows realisieren. Leider hängen wir bei einem ziemlich speziellen Problem fest.

    Folgender Applikationscode:

    Server:

    #define WIN32_LEAN_AND_MEAN
    #define WIN32_WINNT
    
    #include <Windows.h>
    #include <iostream> 
    #include <sstream>
    
    #include <boost/bind.hpp>
    #include <boost/shared_ptr.hpp>
    #include <boost/enable_shared_from_this.hpp>
    #include <boost/asio.hpp>
    
    // -----
    
    class TCPConnection
    	: public boost::enable_shared_from_this<TCPConnection>
    {
    public:
    	typedef boost::shared_ptr<TCPConnection> pointer;
    
    	static pointer create(boost::asio::io_service& my_service)
    	{
    		return pointer(new TCPConnection(my_service));
    	}
    
    	boost::asio::ip::tcp::socket& socket()
    	{
    		return srvSock;
    	}
    
    	~TCPConnection()
    	{
    		srvSock.close();
    	}
    
    	void StartCommunication()
    	{
    		boost::system::error_code error;
    		try
    		{
    			srvSock.set_option(boost::asio::ip::tcp::socket::linger(true,2));
    			srvSock.set_option(boost::asio::ip::tcp::no_delay(true));
    			size_t readBytes = boost::asio::read_until(srvSock, rcvStreamBuffer, "\n");
    			if(!error)
    			{
    				std::string s( (std::istreambuf_iterator<char>(&rcvStreamBuffer)), std::istreambuf_iterator<char>() );
    				std::cout	<< "server socket: " << srvSock.local_endpoint()
    							<< " client socket: " << srvSock.remote_endpoint()
    							<< " got message: " << s.c_str();
    				boost::asio::write(srvSock, boost::asio::buffer("Hello Client!\n"));
    				srvSock.shutdown(boost::asio::ip::tcp::socket::shutdown_send);
    			}
    		}
    		catch (std::exception& e)
    		{
    			std::cerr << "Start communication exception: " << e.what() << std::endl;
    		}
    
    	}
    
    private:
    	TCPConnection(boost::asio::io_service& my_service)
    		: srvSock(my_service)
    	{
    	}
    
    	boost::asio::ip::tcp::socket srvSock;
    	boost::asio::streambuf rcvStreamBuffer;
    };
    
    // -----
    
    class TCPServer_Async
    {
    public:
    	TCPServer_Async(boost::asio::io_service& my_service, short port)
    		: acceptor_(my_service, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port))
    	{
    		start_accept();
    	}
    
    private:
    	void start_accept()
    	{
    		try
    		{
    			TCPConnection::pointer new_connection = TCPConnection::create(acceptor_.get_io_service());
    			acceptor_.set_option(boost::asio::ip::tcp::acceptor::linger(true, 2));
    			acceptor_.set_option(boost::asio::ip::tcp::acceptor::reuse_address(true));
    			acceptor_.listen();
    			acceptor_.async_accept(new_connection->socket(),
    				boost::bind(&TCPServer_Async::handle_accept, this, new_connection,
    				  boost::asio::placeholders::error));
    			Sleep(0);
    		}
    		catch (std::exception& e)
    		{
    			std::cerr << "TCPServer_Async start_accept exception: " << e.what() << "\n";
    		}
    	}
    
    	void cancel_accept()
    	{
    		try
    		{	
    			acceptor_.cancel();
    		}
    		catch (std::exception& e)
    		{
    			std::cerr << "TCPServer_Async cancel_accept exception: " << e.what() << "\n";
    		}
    
    	}
    
    	void handle_accept(TCPConnection::pointer new_connection, const boost::system::error_code& error)
    	{
    		try
    		{
    			if (!error) new_connection->StartCommunication();
    			cancel_accept();
    			start_accept();
    			Sleep(0);
    		}
    		catch  (std::exception& e)
    		{
    			std::cerr << "TCPServer_Async handle_accept exception: " << e.what() << "\n";
    		}
    	}
    
    	boost::asio::ip::tcp::acceptor acceptor_;
    };
    
    // -----
    
    int main() 
    { 
    	std::cout << "Server started" << std::endl;
    
    	boost::system::error_code ec;
    	std::size_t ioHandlerCount = 0;
    	boost::asio::io_service srv_service;
    	TCPServer_Async server(srv_service, 4711);
    	ioHandlerCount = srv_service.run();
    
    	return 0;
    }
    

    Client:

    #define WIN32_LEAN_AND_MEAN
    #define WIN32_WINNT
    
    #include <Windows.h>
    #include <iostream> 
    #include <sstream>
    #include <string>
    
    #include <boost/bind.hpp>
    #include <boost/shared_ptr.hpp>
    #include <boost/enable_shared_from_this.hpp>
    #include <boost/asio.hpp>
    #include <boost/thread/thread.hpp>
    
    // -----
    
    class TCPClient_Async
    {
    public:
    
    	TCPClient_Async(boost::asio::io_service& clt_service, std::string str_adress , std::string str_port)
    		: resolver(clt_service), cltSock(clt_service)
    	{
    		boost::system::error_code error;
    		try
    		{
    			boost::asio::ip::tcp::resolver::query query(str_adress, str_port);
    			boost::asio::connect(cltSock, resolver.resolve(query, error));
    			HandleConnect(error);
    		}
    		catch (std::exception& e)
    		{
    			std::cerr << "TCPClient_Async constructor exception: " << e.what() << "\n";
    		}
    	}
    
    	~TCPClient_Async(void)
    	{
    		try
    		{
    			cltSock.close();
    		}
    		catch (std::exception& e)
    		{
    			std::cerr << "TCPClient_Async destructor exception: " << e.what() << "\n";
    		}
    	}
    
    private:
    
    	void HandleConnect(const boost::system::error_code &error) 
    
    	{ 
    		if (!error) 
    		{
    			try
    			{
    				cltSock.set_option(boost::asio::ip::tcp::socket::linger(true,2));
    				cltSock.set_option(boost::asio::ip::tcp::no_delay(true));
    
    				boost::asio::write(cltSock, boost::asio::buffer("Hello Server!\n"));
    				cltSock.shutdown(boost::asio::ip::tcp::socket::shutdown_send);
    				boost::asio::read_until(cltSock, rcvStreamBuffer, "\n");
    				std::string s( (std::istreambuf_iterator<char>(&rcvStreamBuffer)), std::istreambuf_iterator<char>() );
    				std::cout	<< "client socket: " << cltSock.local_endpoint()
    							<< " server socket: " << cltSock.remote_endpoint()
    							<< " got message: " << s.c_str();
    			}
    			catch (std::exception& e)
    			{
    				std::cerr << "HandleConnect exception: " << e.what() << "\n";
    			}
    
    		}
    	} 
    
    	boost::asio::ip::tcp::resolver resolver;
    	boost::asio::ip::tcp::socket cltSock;
    	boost::asio::streambuf rcvStreamBuffer;
    
    };
    
    // -----
    
    void ThreadFunction()
    {
    	boost::system::error_code ec;
    
    	while(true)
    	{
    		try
    		{	
    			boost::asio::io_service io_service; 
    			TCPClient_Async client(io_service, "127.0.0.1", "4711");
    			io_service.run();
    			io_service.reset();
    		}
    		catch (std::exception& e)
    		{
    			std::cerr << "Exception: " << e.what() << "\n";
    		}
    		boost::this_thread::sleep(boost::posix_time::seconds(1));
    	}
    };
    
    // -----
    
    int main() 
    {
    	std::cout << "Client" << std::endl;
    	boost::thread t(ThreadFunction);
    	std::cout << "Press enter to exit" << std::endl;
    	std::cin.clear();
    	std::cin.ignore(std::cin.rdbuf()->in_avail());
    	std::cin.get();
    
    	return 0;
    }
    

    Der Server soll asynchron Verbindungen des Clients bearbeiten. Wenn wir einen Stresstest starten (1 Server Applikation, n Client Applikationen) bleiben die Sockets des Clients nach der Beendigung der Kommunikation bestehen. Das kann man per netstat nachvollziehen.

    Fehlt uns im Code noch ein Aufruf zum sauberen Beenden?

    Danke schonmal im Voraus.



  • Ich habe keine Ahnung von boost, aber ich weiß, dass es in C einen Call zum Shutdown des Sockets gibt ( shutdown ), der den Socket aber nicht schließt. Und es gibt einen close -Call (unter Windows closesocket ), der den Socket schließt und die Systemresourcen freigibt.

    Und das gibt es auch für boost.

    Probier's mal.



  • Ach, Destruktorenkacke.
    Nur weil nicht sofort close nach dem shutdown aufgerufen wird, heißt das nicht, dass das überhaupt nicht aufgerufen wird.

    Ich ziehe den Einwand zurück.



  • Moin!
    Ich kann das nicht nachvollziehen - bzw. schon, aber ich sehe kein Problem. 😉

    Wenn ich den Source ausprobiere, habe ich zwar eine Hand voll Verbindung im TIME_WAIT Status. Aber das ist doch normal?
    Allerdings bin ich hier auch auf einem Linux unterwegs.



  • Sehe ich das richtig das du nur ein Shutdown(send) machst? Das sollte besser ein shutdown(send|receive) sein.



  • Furble Wurble schrieb:

    Moin!
    Ich kann das nicht nachvollziehen - bzw. schon, aber ich sehe kein Problem. 😉

    Wenn ich den Source ausprobiere, habe ich zwar eine Hand voll Verbindung im TIME_WAIT Status. Aber das ist doch normal?
    Allerdings bin ich hier auch auf einem Linux unterwegs.

    Ja, da hast du erstmal Recht. Ich hatte vergessen, die Randbedingungen zu erwähnen:
    In unserem System werden sich nach dem Start mehrere Hundert Threads (Clients) mit einer Reihe von Servern unterhalten, um verschiedene Funktionen zu erfüllen. Außerdem laufen wir nicht als einzige Applikation auf dem Rechner, daher haben wir nur einen Port-Bereich von etwa 2000 Ports zur Verfügung.

    Ich habe nun die Befürchtung, dass uns zum Systemstarts die Ports/Sockets ausgehen. Ich weiß, das TCP/IP die Ports für 2*MSL (etwa 4 Minuten) offen hält, um später eintreffende Pakete zu empfangen. Diese Funktionalität brauchen wir aber nicht, da wir genau wissen, wann eine Kommunikation zu Ende ist. Ich kann mir schwer vorstellen, dass man diese 4 Minuten Wartezeit per Boost.Asio nicht ändern kann. Oder ganz abschalten.

    Vielleicht ist TCP/IP auch gar nicht das Mittel der Wahl für diese Aufgabe. 😕

    Schönes Wochenende


Anmelden zum Antworten