Browse Source

Merge pull request #1314 from vcmi/optimze-connection-write

[1.1] Socket buffered write. Write full cpack at once
Andrii Danylchenko 2 years ago
parent
commit
d1dc545f53
2 changed files with 76 additions and 0 deletions
  1. 69 0
      lib/serializer/Connection.cpp
  2. 7 0
      lib/serializer/Connection.h

+ 69 - 0
lib/serializer/Connection.cpp

@@ -31,9 +31,18 @@ using namespace boost::asio::ip;
 #define LIL_ENDIAN
 #endif
 
+struct ConnectionBuffers
+{
+	boost::asio::streambuf readBuffer;
+	boost::asio::streambuf writeBuffer;
+};
 
 void CConnection::init()
 {
+	enableBufferedWrite = false;
+	enableBufferedRead = false;
+	connectionBuffers = std::make_unique<ConnectionBuffers>();
+
 	socket->set_option(boost::asio::ip::tcp::no_delay(true));
     try
     {
@@ -72,6 +81,7 @@ CConnection::CConnection(std::string host, ui16 port, std::string Name, std::str
 	int i;
 	boost::system::error_code error = asio::error::host_not_found;
 	socket = std::make_shared<tcp::socket>(*io_service);
+
 	tcp::resolver resolver(*io_service);
 	tcp::resolver::iterator end, pom, endpoint_iterator = resolver.resolve(tcp::resolver::query(host, std::to_string(port)),error);
 	if(error)
@@ -138,10 +148,39 @@ CConnection::CConnection(std::shared_ptr<TAcceptor> acceptor, std::shared_ptr<bo
 	}
 	init();
 }
+
+void CConnection::flushBuffers()
+{
+	if(!enableBufferedWrite)
+		return;
+
+	try
+	{
+		asio::write(*socket, connectionBuffers->writeBuffer);
+	}
+	catch(...)
+	{
+		//connection has been lost
+		connected = false;
+		throw;
+	}
+
+	enableBufferedWrite = false;
+}
+
 int CConnection::write(const void * data, unsigned size)
 {
 	try
 	{
+		if(enableBufferedWrite)
+		{
+			std::ostream ostream(&connectionBuffers->writeBuffer);
+		
+			ostream.write(static_cast<const char *>(data), size);
+
+			return size;
+		}
+
 		int ret;
 		ret = static_cast<int>(asio::write(*socket,asio::const_buffers_1(asio::const_buffer(data,size))));
 		return ret;
@@ -153,10 +192,29 @@ int CConnection::write(const void * data, unsigned size)
 		throw;
 	}
 }
+
 int CConnection::read(void * data, unsigned size)
 {
 	try
 	{
+		if(enableBufferedRead)
+		{
+			auto available = connectionBuffers->readBuffer.size();
+
+			while(available < size)
+			{
+				auto bytesRead = socket->read_some(connectionBuffers->readBuffer.prepare(1024));
+				connectionBuffers->readBuffer.commit(bytesRead);
+				available = connectionBuffers->readBuffer.size();
+			}
+
+			std::istream istream(&connectionBuffers->readBuffer);
+
+			istream.read(static_cast<char *>(data), size);
+
+			return size;
+		}
+
 		int ret = static_cast<int>(asio::read(*socket,asio::mutable_buffers_1(asio::mutable_buffer(data,size))));
 		return ret;
 	}
@@ -167,6 +225,7 @@ int CConnection::read(void * data, unsigned size)
 		throw;
 	}
 }
+
 CConnection::~CConnection()
 {
 	if(handler)
@@ -210,6 +269,8 @@ void CConnection::reportState(vstd::CLoggerBase * out)
 
 CPack * CConnection::retrievePack()
 {
+	enableBufferedRead = true;
+
 	CPack * pack = nullptr;
 	boost::unique_lock<boost::mutex> lock(*mutexRead);
 	iser & pack;
@@ -222,6 +283,9 @@ CPack * CConnection::retrievePack()
 	{
 		pack->c = this->shared_from_this();
 	}
+
+	enableBufferedRead = false;
+
 	return pack;
 }
 
@@ -229,7 +293,12 @@ void CConnection::sendPack(const CPack * pack)
 {
 	boost::unique_lock<boost::mutex> lock(*mutexWrite);
 	logNetwork->trace("Sending a pack of type %s", typeid(*pack).name());
+
+	enableBufferedWrite = true;
+
 	oser & pack;
+
+	flushBuffers();
 }
 
 void CConnection::disableStackSendingByID()

+ 7 - 0
lib/serializer/Connection.h

@@ -52,6 +52,7 @@ typedef boost::asio::basic_socket_acceptor<boost::asio::ip::tcp, boost::asio::so
 VCMI_LIB_NAMESPACE_BEGIN
 
 struct CPack;
+struct ConnectionBuffers;
 
 /// Main class for network communication
 /// Allows establishing connection and bidirectional read-write
@@ -63,8 +64,14 @@ class DLL_LINKAGE CConnection
 
 	int write(const void * data, unsigned size) override;
 	int read(void * data, unsigned size) override;
+	void flushBuffers();
 
 	std::shared_ptr<boost::asio::io_service> io_service; //can be empty if connection made from socket
+
+	bool enableBufferedWrite;
+	bool enableBufferedRead;
+	std::unique_ptr<ConnectionBuffers> connectionBuffers;
+
 public:
 	BinaryDeserializer iser;
 	BinarySerializer oser;