Explorar o código

Connection buffered read cpack.

Andrii Danylchenko %!s(int64=2) %!d(string=hai) anos
pai
achega
c271f9187f
Modificáronse 2 ficheiros con 35 adicións e 7 borrados
  1. 30 7
      lib/serializer/Connection.cpp
  2. 5 0
      lib/serializer/Connection.h

+ 30 - 7
lib/serializer/Connection.cpp

@@ -162,17 +162,17 @@ void CConnection::flushBuffers()
 
 int CConnection::write(const void * data, unsigned size)
 {
-	if(enableBufferedWrite)
+	try
 	{
-		std::ostream ostream(&writeBuffer);
+		if(enableBufferedWrite)
+		{
+			std::ostream ostream(&writeBuffer);
 		
-		ostream.write(static_cast<const char *>(data), size);
+			ostream.write(static_cast<const char *>(data), size);
 
-		return size;
-	}
+			return size;
+		}
 
-	try
-	{
 		int ret;
 		ret = static_cast<int>(asio::write(*socket,asio::const_buffers_1(asio::const_buffer(data,size))));
 		return ret;
@@ -189,6 +189,24 @@ int CConnection::read(void * data, unsigned size)
 {
 	try
 	{
+		if(enableBufferedRead)
+		{
+			auto available = readBuffer.size();
+
+			while(available < size)
+			{
+				auto bytesRead = socket->read_some(readBuffer.prepare(1024));
+				readBuffer.commit(bytesRead);
+				available = readBuffer.size();
+			}
+
+			std::istream istream(&readBuffer);
+
+			istream.read(static_cast<char *>(data), size);
+
+			return size;
+		}
+
 		int ret = static_cast<int>(asio::read(*socket,asio::mutable_buffers_1(asio::mutable_buffer(data,size))));
 		return ret;
 	}
@@ -243,6 +261,8 @@ void CConnection::reportState(vstd::CLoggerBase * out)
 
 CPack * CConnection::retrievePack()
 {
+	enableBufferedRead = true;
+
 	CPack * pack = nullptr;
 	boost::unique_lock<boost::mutex> lock(*mutexRead);
 	iser & pack;
@@ -255,6 +275,9 @@ CPack * CConnection::retrievePack()
 	{
 		pack->c = this->shared_from_this();
 	}
+
+	enableBufferedRead = false;
+
 	return pack;
 }
 

+ 5 - 0
lib/serializer/Connection.h

@@ -21,6 +21,8 @@ namespace boost
 {
 	namespace asio
 	{
+		class streambuf;
+
 		namespace ip
 		{
 			class tcp;
@@ -70,6 +72,9 @@ class DLL_LINKAGE CConnection
 	bool enableBufferedWrite;
 	boost::asio::streambuf writeBuffer;
 
+	bool enableBufferedRead;
+	boost::asio::streambuf readBuffer;
+
 public:
 	BinaryDeserializer iser;
 	BinarySerializer oser;