Explorar el Código

fix(ssh): add EOF handling and error propagation for port forwarding (#10964)

Co-authored-by: Eugene <[email protected]>
ajkrj hace 1 mes
padre
commit
f8f625f6ba
Se han modificado 1 ficheros con 57 adiciones y 21 borrados
  1. 57 21
      tabby-ssh/src/session/ssh.ts

+ 57 - 21
tabby-ssh/src/session/ssh.ts

@@ -495,10 +495,9 @@ export class SSHSession {
                 this.emitServiceMessage(colors.bgRed.black(' X ') + ` Could not forward the remote connection to ${forward.targetAddress}:${forward.targetPort}: ${e}`)
                 channel.close()
             })
-            channel.data$.subscribe(data => socket.write(data))
-            socket.on('data', data => channel.write(Uint8Array.from(data)))
-            channel.closed$.subscribe(() => socket.destroy())
-            socket.on('close', () => channel.close())
+
+            this.setupSocketChannelEvents(channel, socket, 'Remote forward')
+
             socket.on('connect', () => {
                 this.logger.info('Connection forwarded')
             })
@@ -519,19 +518,7 @@ export class SSHSession {
             try {
                 const x11Stream = await socket.connect(displaySpec)
                 this.logger.info('Connection forwarded')
-
-                channel.data$.subscribe(data => {
-                    x11Stream.write(data)
-                })
-                x11Stream.on('data', data => {
-                    channel.write(Uint8Array.from(data))
-                })
-                channel.closed$.subscribe(() => {
-                    socket.destroy()
-                })
-                x11Stream.on('close', () => {
-                    channel.close()
-                })
+                this.setupSocketChannelEvents(channel, x11Stream, 'X11 forward')
             } catch (e) {
                 // eslint-disable-next-line @typescript-eslint/no-base-to-string
                 this.emitServiceMessage(colors.bgRed.black(' X ') + ` Could not connect to the X server: ${e}`)
@@ -788,10 +775,8 @@ export class SSHSession {
                     throw err
                 }))
                 const socket = accept()
-                channel.data$.subscribe(data => socket.write(data))
-                socket.on('data', data => channel.write(Uint8Array.from(data)))
-                channel.closed$.subscribe(() => socket.destroy())
-                socket.on('close', () => channel.close())
+
+                this.setupSocketChannelEvents(channel, socket, 'Local forward')
             }).then(() => {
                 this.emitServiceMessage(colors.bgGreen.black(' -> ') + ` Forwarded ${fw}`)
                 this.forwardedPorts.push(fw)
@@ -865,6 +850,57 @@ export class SSHSession {
         return ch
     }
 
+    private setupSocketChannelEvents (channel: russh.Channel, socket: Socket, logPrefix: string): void {
+        // Channel → Socket data flow with error handling
+        channel.data$.subscribe({
+            next: data => socket.write(data),
+            error: err => {
+                this.logger.error(`${logPrefix}: channel data error: ${err}`)
+                socket.destroy()
+            },
+        })
+
+        // Socket → Channel data flow with proper conversion
+        socket.on('data', data => {
+            try {
+                channel.write(new Uint8Array(data.buffer, data.byteOffset, data.byteLength))
+            } catch (err) {
+                this.logger.error(`${logPrefix}: channel write error: ${err}`)
+                socket.destroy(new Error(`${logPrefix}failed to write to channel: ${err}`))
+            }
+        })
+
+        // Handle EOF from remote
+        channel.eof$.subscribe(() => {
+            this.logger.debug(`${logPrefix}: channel EOF received, ending socket`)
+            socket.end()
+        })
+
+        // Handle channel close
+        channel.closed$.subscribe(() => {
+            this.logger.debug(`${logPrefix}: channel closed, destroying socket`)
+            socket.destroy()
+        })
+
+        // Handle socket errors
+        socket.on('error', err => {
+            this.logger.error(`${logPrefix}: socket error: ${err}`)
+            channel.close()
+        })
+
+        // Handle socket close
+        socket.on('close', () => {
+            this.logger.debug(`${logPrefix}: socket closed, closing channel`)
+            channel.close()
+        })
+
+        // Handle EOF from local
+        socket.on('end', () => {
+            this.logger.debug(`${logPrefix}: socket end, sending EOF to channel`)
+            channel.eof()
+        })
+    }
+
     async loadPrivateKey (name: string, privateKeyContents: Buffer): Promise<russh.KeyPair> {
         this.activePrivateKey = await this.loadPrivateKeyWithPassphraseMaybe(privateKeyContents.toString())
         return this.activePrivateKey