ipc-server.ts 3.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137
  1. import EventEmitter from "node:events"
  2. import { Socket } from "node:net"
  3. import * as crypto from "node:crypto"
  4. import ipc from "node-ipc"
  5. import {
  6. type IpcServerEvents,
  7. type RooCodeIpcServer,
  8. IpcOrigin,
  9. IpcMessageType,
  10. type IpcMessage,
  11. ipcMessageSchema,
  12. } from "@roo-code/types"
  13. export class IpcServer extends EventEmitter<IpcServerEvents> implements RooCodeIpcServer {
  14. private readonly _socketPath: string
  15. private readonly _log: (...args: unknown[]) => void
  16. private readonly _clients: Map<string, Socket>
  17. private _isListening = false
  18. constructor(socketPath: string, log = console.log) {
  19. super()
  20. this._socketPath = socketPath
  21. this._log = log
  22. this._clients = new Map()
  23. }
  24. public listen() {
  25. this._isListening = true
  26. ipc.config.silent = true
  27. ipc.serve(this.socketPath, () => {
  28. ipc.server.on("connect", (socket) => this.onConnect(socket))
  29. ipc.server.on("socket.disconnected", (socket) => this.onDisconnect(socket))
  30. ipc.server.on("message", (data) => this.onMessage(data))
  31. })
  32. ipc.server.start()
  33. }
  34. private onConnect(socket: Socket) {
  35. const clientId = crypto.randomBytes(6).toString("hex")
  36. this._clients.set(clientId, socket)
  37. this.log(`[server#onConnect] clientId = ${clientId}, # clients = ${this._clients.size}`)
  38. this.send(socket, {
  39. type: IpcMessageType.Ack,
  40. origin: IpcOrigin.Server,
  41. data: { clientId, pid: process.pid, ppid: process.ppid },
  42. })
  43. this.emit(IpcMessageType.Connect, clientId)
  44. }
  45. private onDisconnect(destroyedSocket: Socket) {
  46. let disconnectedClientId: string | undefined
  47. for (const [clientId, socket] of this._clients.entries()) {
  48. if (socket === destroyedSocket) {
  49. disconnectedClientId = clientId
  50. this._clients.delete(clientId)
  51. break
  52. }
  53. }
  54. this.log(`[server#socket.disconnected] clientId = ${disconnectedClientId}, # clients = ${this._clients.size}`)
  55. if (disconnectedClientId) {
  56. this.emit(IpcMessageType.Disconnect, disconnectedClientId)
  57. }
  58. }
  59. private onMessage(data: unknown) {
  60. if (typeof data !== "object") {
  61. this.log(`[server#onMessage] invalid data -> ${JSON.stringify(data)}`)
  62. return
  63. }
  64. const result = ipcMessageSchema.safeParse(data)
  65. if (!result.success) {
  66. this.log(
  67. `[server#onMessage] invalid payload -> ${JSON.stringify(result.error.issues)} -> ${JSON.stringify(data)}`,
  68. )
  69. return
  70. }
  71. const payload = result.data
  72. if (payload.origin === IpcOrigin.Client) {
  73. switch (payload.type) {
  74. case IpcMessageType.TaskCommand:
  75. this.emit(IpcMessageType.TaskCommand, payload.clientId, payload.data)
  76. break
  77. default:
  78. this.log(`[server#onMessage] unhandled payload: ${JSON.stringify(payload)}`)
  79. break
  80. }
  81. }
  82. }
  83. private log(...args: unknown[]) {
  84. this._log(...args)
  85. }
  86. public broadcast(message: IpcMessage) {
  87. // this.log("[server#broadcast] message =", message)
  88. ipc.server.broadcast("message", message)
  89. }
  90. public send(client: string | Socket, message: IpcMessage) {
  91. // this.log("[server#send] message =", message)
  92. if (typeof client === "string") {
  93. const socket = this._clients.get(client)
  94. if (socket) {
  95. ipc.server.emit(socket, "message", message)
  96. }
  97. } else {
  98. ipc.server.emit(client, "message", message)
  99. }
  100. }
  101. public get socketPath() {
  102. return this._socketPath
  103. }
  104. public get isListening() {
  105. return this._isListening
  106. }
  107. }