Pārlūkot izejas kodu

Limited SFTP upload queue length to avoid networking congestion

When uploading to ProFTPD mod_sftp in local Docker, the queue could be as long as 3000 requests, the TCP window as reported by Wireshark constantly full and re-transmitting, and eventually the server stops responding.

Source commit: 9ba6fd4a52c547db84da99b49779271b75fb0968
Martin Prikryl 1 gadu atpakaļ
vecāks
revīzija
7ae34b8957
2 mainītis faili ar 48 papildinājumiem un 13 dzēšanām
  1. 1 1
      source/core/SessionData.cpp
  2. 47 12
      source/core/SftpFileSystem.cpp

+ 1 - 1
source/core/SessionData.cpp

@@ -303,7 +303,7 @@ void __fastcall TSessionData::DefaultSettings()
   // SFTP
   SftpServer = L"";
   SFTPDownloadQueue = 32;
-  SFTPUploadQueue = 32;
+  SFTPUploadQueue = 64;
   SFTPListingQueue = 2;
   SFTPMaxVersion = ::SFTPMaxVersion;
   SFTPMaxPacketSize = 0;

+ 47 - 12
source/core/SftpFileSystem.cpp

@@ -1152,11 +1152,11 @@ public:
     return SendRequests();
   }
 
-  virtual void __fastcall Dispose(int ExpectedType, int AllowStatus)
+  void DisposeUntil(int Count, int ExpectedType, int AllowStatus)
   {
     DebugAssert(FFileSystem->FTerminal->Active);
 
-    while (FRequests->Count)
+    while (FRequests->Count > Count)
     {
       DebugAssert(FResponses->Count);
 
@@ -1173,7 +1173,7 @@ public:
       }
       catch(Exception & E)
       {
-        if (ExpectedType < 0)
+        if ((ExpectedType < 0) && (Count == 0))
         {
           if (FFileSystem->FTerminal->Active)
           {
@@ -1194,6 +1194,11 @@ public:
     }
   }
 
+  virtual void __fastcall Dispose(int ExpectedType, int AllowStatus)
+  {
+    DisposeUntil(0, ExpectedType, AllowStatus);
+  }
+
   void __fastcall DisposeSafe(int ExpectedType = -1, int AllowStatus = -1)
   {
     if (FFileSystem->FTerminal->Active)
@@ -1320,6 +1325,10 @@ protected:
       TSFTPPacket * Response = new TSFTPPacket();
       FRequests->Add(Request);
       FResponses->Add(Response);
+      if (FFileSystem->FTerminal->Configuration->ActualLogProtocol >= 1)
+      {
+        FFileSystem->FTerminal->LogEvent(FORMAT(L"Queue len: %d", (FRequests->Count)));
+      }
 
       // make sure the response is reserved before actually ending the message
       // as we may receive response asynchronously before SendPacket finishes
@@ -1366,13 +1375,10 @@ protected:
 class TSFTPAsynchronousQueue : public TSFTPQueue
 {
 public:
-  #pragma option push -vi- // WORKAROUND for internal compiler errors
   __fastcall TSFTPAsynchronousQueue(TSFTPFileSystem * AFileSystem) : TSFTPQueue(AFileSystem)
   {
-    FFileSystem->FSecureShell->RegisterReceiveHandler(ReceiveHandler);
-    FReceiveHandlerRegistered = true;
+    RegisterReceiveHandler();
   }
-  #pragma option pop
 
   virtual __fastcall ~TSFTPAsynchronousQueue()
   {
@@ -1414,14 +1420,22 @@ protected:
     return true;
   }
 
-  #pragma option push -vi- // See pragma at constructor
-  void __fastcall UnregisterReceiveHandler()
+  #pragma option push -vi- // WORKAROUND for internal compiler errors
+  bool __fastcall UnregisterReceiveHandler()
   {
-    if (FReceiveHandlerRegistered)
+    bool Result = FReceiveHandlerRegistered;
+    if (Result)
     {
       FReceiveHandlerRegistered = false;
       FFileSystem->FSecureShell->UnregisterReceiveHandler(ReceiveHandler);
     }
+    return Result;
+  }
+
+  void __fastcall RegisterReceiveHandler()
+  {
+    FFileSystem->FSecureShell->RegisterReceiveHandler(ReceiveHandler);
+    FReceiveHandlerRegistered = true;
   }
   #pragma option pop
 
@@ -1512,7 +1526,7 @@ private:
 class TSFTPUploadQueue : public TSFTPAsynchronousQueue
 {
 public:
-  TSFTPUploadQueue(TSFTPFileSystem * AFileSystem, TEncryption * Encryption) :
+  TSFTPUploadQueue(TSFTPFileSystem * AFileSystem, TEncryption * Encryption, int QueueMaxLen) :
     TSFTPAsynchronousQueue(AFileSystem),
     FEncryption(Encryption)
   {
@@ -1522,6 +1536,7 @@ public:
     FLastBlockSize = 0;
     FEnd = false;
     FConvertToken = false;
+    FQueueMaxLen = QueueMaxLen;
   }
 
   virtual __fastcall ~TSFTPUploadQueue()
@@ -1673,6 +1688,25 @@ protected:
     return FEnd;
   }
 
+  virtual bool __fastcall SendRequest()
+  {
+    bool Result = TSFTPAsynchronousQueue::SendRequest();
+    if (FRequests->Count >= FQueueMaxLen)
+    {
+      FFileSystem->FTerminal->LogEvent(1, L"Too many outstanding requests, waiting for responses...");
+      DebugCheck(UnregisterReceiveHandler());
+      try
+      {
+        DisposeUntil(FQueueMaxLen - 1, SSH_FXP_STATUS, -1);
+      }
+      __finally
+      {
+        RegisterReceiveHandler();
+      }
+    }
+    return Result;
+  }
+
 private:
   TStream * FStream;
   TTransferInEvent FOnTransferIn;
@@ -1685,6 +1719,7 @@ private:
   bool FConvertToken;
   int FConvertParams;
   TEncryption * FEncryption;
+  int FQueueMaxLen;
 };
 //---------------------------------------------------------------------------
 class TSFTPLoadFilesPropertiesQueue : public TSFTPFixedLenQueue
@@ -4804,7 +4839,7 @@ void __fastcall TSFTPFileSystem::Source(
 
     TEncryption Encryption(FTerminal->GetEncryptKey());
     bool Encrypt = FTerminal->IsFileEncrypted(DestFullName, CopyParam->EncryptNewFiles);
-    TSFTPUploadQueue Queue(this, (Encrypt ? &Encryption : NULL));
+    TSFTPUploadQueue Queue(this, (Encrypt ? &Encryption : NULL), FTerminal->SessionData->SFTPUploadQueue);
     try
     {
       int ConvertParams =