worker.ts 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152
  1. import { Installation } from "@/installation"
  2. import { Server } from "@/server/server"
  3. import { Log } from "@/util/log"
  4. import { Instance } from "@/project/instance"
  5. import { InstanceBootstrap } from "@/project/bootstrap"
  6. import { Rpc } from "@/util/rpc"
  7. import { upgrade } from "@/cli/upgrade"
  8. import { Config } from "@/config/config"
  9. import { GlobalBus } from "@/bus/global"
  10. import { createOpencodeClient, type Event } from "@opencode-ai/sdk/v2"
  11. import type { BunWebSocketData } from "hono/bun"
  12. import { Flag } from "@/flag/flag"
  13. await Log.init({
  14. print: process.argv.includes("--print-logs"),
  15. dev: Installation.isLocal(),
  16. level: (() => {
  17. if (Installation.isLocal()) return "DEBUG"
  18. return "INFO"
  19. })(),
  20. })
  21. process.on("unhandledRejection", (e) => {
  22. Log.Default.error("rejection", {
  23. e: e instanceof Error ? e.message : e,
  24. })
  25. })
  26. process.on("uncaughtException", (e) => {
  27. Log.Default.error("exception", {
  28. e: e instanceof Error ? e.message : e,
  29. })
  30. })
  31. // Subscribe to global events and forward them via RPC
  32. GlobalBus.on("event", (event) => {
  33. Rpc.emit("global.event", event)
  34. })
  35. let server: Bun.Server<BunWebSocketData> | undefined
  36. const eventStream = {
  37. abort: undefined as AbortController | undefined,
  38. }
  39. const startEventStream = (directory: string) => {
  40. if (eventStream.abort) eventStream.abort.abort()
  41. const abort = new AbortController()
  42. eventStream.abort = abort
  43. const signal = abort.signal
  44. const fetchFn = (async (input: RequestInfo | URL, init?: RequestInit) => {
  45. const request = new Request(input, init)
  46. const auth = getAuthorizationHeader()
  47. if (auth) request.headers.set("Authorization", auth)
  48. return Server.App().fetch(request)
  49. }) as typeof globalThis.fetch
  50. const sdk = createOpencodeClient({
  51. baseUrl: "http://opencode.internal",
  52. directory,
  53. fetch: fetchFn,
  54. signal,
  55. })
  56. ;(async () => {
  57. while (!signal.aborted) {
  58. const events = await Promise.resolve(
  59. sdk.event.subscribe(
  60. {},
  61. {
  62. signal,
  63. },
  64. ),
  65. ).catch(() => undefined)
  66. if (!events) {
  67. await Bun.sleep(250)
  68. continue
  69. }
  70. for await (const event of events.stream) {
  71. Rpc.emit("event", event as Event)
  72. }
  73. if (!signal.aborted) {
  74. await Bun.sleep(250)
  75. }
  76. }
  77. })().catch((error) => {
  78. Log.Default.error("event stream error", {
  79. error: error instanceof Error ? error.message : error,
  80. })
  81. })
  82. }
  83. startEventStream(process.cwd())
  84. export const rpc = {
  85. async fetch(input: { url: string; method: string; headers: Record<string, string>; body?: string }) {
  86. const headers = { ...input.headers }
  87. const auth = getAuthorizationHeader()
  88. if (auth && !headers["authorization"] && !headers["Authorization"]) {
  89. headers["Authorization"] = auth
  90. }
  91. const request = new Request(input.url, {
  92. method: input.method,
  93. headers,
  94. body: input.body,
  95. })
  96. const response = await Server.App().fetch(request)
  97. const body = await response.text()
  98. return {
  99. status: response.status,
  100. headers: Object.fromEntries(response.headers.entries()),
  101. body,
  102. }
  103. },
  104. async server(input: { port: number; hostname: string; mdns?: boolean; cors?: string[] }) {
  105. if (server) await server.stop(true)
  106. server = Server.listen(input)
  107. return { url: server.url.toString() }
  108. },
  109. async checkUpgrade(input: { directory: string }) {
  110. await Instance.provide({
  111. directory: input.directory,
  112. init: InstanceBootstrap,
  113. fn: async () => {
  114. await upgrade().catch(() => {})
  115. },
  116. })
  117. },
  118. async reload() {
  119. Config.global.reset()
  120. await Instance.disposeAll()
  121. },
  122. async shutdown() {
  123. Log.Default.info("worker shutting down")
  124. if (eventStream.abort) eventStream.abort.abort()
  125. await Instance.disposeAll()
  126. if (server) server.stop(true)
  127. },
  128. }
  129. Rpc.listen(rpc)
  130. function getAuthorizationHeader(): string | undefined {
  131. const password = Flag.OPENCODE_SERVER_PASSWORD
  132. if (!password) return undefined
  133. const username = Flag.OPENCODE_SERVER_USERNAME ?? "opencode"
  134. return `Basic ${btoa(`${username}:${password}`)}`
  135. }