pty.ts 4.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import * as nodePTY from '@tabby-gang/node-pty'
  2. import { v4 as uuidv4 } from 'uuid'
  3. import { ipcMain } from 'electron'
  4. import { Application } from './app'
  5. import { UTF8Splitter } from './utfSplitter'
  6. import { Subject, debounceTime } from 'rxjs'
  7. import { StringDecoder } from './stringDecoder'
  8. class PTYDataQueue {
  9. private buffers: Buffer[] = []
  10. private delta = 0
  11. private maxChunk = 1024 * 100
  12. private maxDelta = this.maxChunk * 5
  13. private flowPaused = false
  14. private decoder = new UTF8Splitter()
  15. private output$ = new Subject<Buffer>()
  16. constructor (private pty: nodePTY.IPty, private onData: (data: Buffer) => void) {
  17. this.output$.pipe(debounceTime(500)).subscribe(() => {
  18. const remainder = this.decoder.flush()
  19. if (remainder.length) {
  20. this.onData(remainder)
  21. }
  22. })
  23. }
  24. push (data: Buffer) {
  25. this.buffers.push(data)
  26. this.maybeEmit()
  27. }
  28. ack (length: number) {
  29. this.delta -= length
  30. this.maybeEmit()
  31. }
  32. private maybeEmit () {
  33. if (this.delta <= this.maxDelta && this.flowPaused) {
  34. this.resume()
  35. return
  36. }
  37. if (this.buffers.length > 0) {
  38. if (this.delta > this.maxDelta && !this.flowPaused) {
  39. this.pause()
  40. return
  41. }
  42. const buffersToSend = []
  43. let totalLength = 0
  44. while (totalLength < this.maxChunk && this.buffers.length) {
  45. totalLength += this.buffers[0].length
  46. buffersToSend.push(this.buffers.shift())
  47. }
  48. if (buffersToSend.length === 0) {
  49. return
  50. }
  51. let toSend = Buffer.concat(buffersToSend)
  52. if (toSend.length > this.maxChunk) {
  53. this.buffers.unshift(toSend.slice(this.maxChunk))
  54. toSend = toSend.slice(0, this.maxChunk)
  55. }
  56. this.emitData(toSend)
  57. this.delta += toSend.length
  58. if (this.buffers.length) {
  59. setImmediate(() => this.maybeEmit())
  60. }
  61. }
  62. }
  63. private emitData (data: Buffer) {
  64. const validChunk = this.decoder.write(data)
  65. this.onData(validChunk)
  66. this.output$.next(validChunk)
  67. }
  68. private pause () {
  69. this.pty.pause()
  70. this.flowPaused = true
  71. }
  72. private resume () {
  73. this.pty.resume()
  74. this.flowPaused = false
  75. this.maybeEmit()
  76. }
  77. }
  78. export class PTY {
  79. private pty: nodePTY.IPty
  80. private outputQueue: PTYDataQueue
  81. private decoder = new StringDecoder()
  82. exited = false
  83. constructor (private id: string, private app: Application, ...args: any[]) {
  84. this.pty = (nodePTY as any).spawn(...args)
  85. for (const key of ['close', 'exit']) {
  86. (this.pty as any).on(key, (...eventArgs) => this.emit(key, ...eventArgs))
  87. }
  88. this.outputQueue = new PTYDataQueue(this.pty, data => {
  89. setImmediate(() => this.emit('data', this.decoder.write(data)))
  90. })
  91. this.pty.onData(data => this.outputQueue.push(Buffer.from(data)))
  92. this.pty.onExit(() => {
  93. this.exited = true
  94. })
  95. }
  96. getPID (): number {
  97. return this.pty.pid
  98. }
  99. resize (columns: number, rows: number): void {
  100. if ((this.pty as any)._writable) {
  101. this.pty.resize(columns, rows)
  102. }
  103. }
  104. write (buffer: Buffer): void {
  105. if ((this.pty as any)._writable) {
  106. this.pty.write(buffer as any)
  107. }
  108. }
  109. ackData (length: number): void {
  110. this.outputQueue.ack(length)
  111. }
  112. kill (signal?: string): void {
  113. this.pty.kill(signal)
  114. }
  115. private emit (event: string, ...args: any[]) {
  116. this.app.broadcast(`pty:${this.id}:${event}`, ...args)
  117. }
  118. }
  119. export class PTYManager {
  120. private ptys: Record<string, PTY|undefined> = {}
  121. init (app: Application): void {
  122. ipcMain.on('pty:spawn', (event, ...options) => {
  123. const id = uuidv4().toString()
  124. event.returnValue = id
  125. this.ptys[id] = new PTY(id, app, ...options)
  126. })
  127. ipcMain.on('pty:exists', (event, id) => {
  128. event.returnValue = this.ptys[id] && !this.ptys[id].exited
  129. })
  130. ipcMain.on('pty:get-pid', (event, id) => {
  131. event.returnValue = this.ptys[id]?.getPID()
  132. })
  133. ipcMain.on('pty:resize', (_event, id, columns, rows) => {
  134. this.ptys[id]?.resize(columns, rows)
  135. })
  136. ipcMain.on('pty:write', (_event, id, data) => {
  137. this.ptys[id]?.write(Buffer.from(data))
  138. })
  139. ipcMain.on('pty:kill', (_event, id, signal) => {
  140. this.ptys[id]?.kill(signal)
  141. })
  142. ipcMain.on('pty:ack-data', (_event, id, length) => {
  143. this.ptys[id]?.ackData(length)
  144. })
  145. }
  146. }