Просмотр исходного кода

fix(ripgrep): restore native rg backend

Shoubhit Dash 1 день назад
Родитель
Сommit
1fd8ce57a2

+ 0 - 3
bun.lock

@@ -404,7 +404,6 @@
         "opentui-spinner": "0.0.6",
         "partial-json": "0.1.7",
         "remeda": "catalog:",
-        "ripgrep": "0.3.1",
         "semver": "^7.6.3",
         "solid-js": "catalog:",
         "strip-ansi": "7.1.2",
@@ -4479,8 +4478,6 @@
 
     "rimraf": ["[email protected]", "", { "dependencies": { "glob": "^7.1.3" }, "bin": { "rimraf": "./bin.js" } }, "sha512-mwqeW5XsA2qAejG46gYdENaxXjx9onRNCfn7L0duuP4hCuTIi/QO7PDK07KJfp1d+izWPrzEJDcSqBa0OZQriA=="],
 
-    "ripgrep": ["[email protected]", "", { "bin": { "rg": "lib/rg.mjs", "ripgrep": "lib/rg.mjs" } }, "sha512-6bDtNIBh1qPviVIU685/4uv0Ap5t8eS4wiJhy/tR2LdIeIey9CVasENlGS+ul3HnTmGANIp7AjnfsztsRmALfQ=="],
-
     "roarr": ["[email protected]", "", { "dependencies": { "boolean": "^3.0.1", "detect-node": "^2.0.4", "globalthis": "^1.0.1", "json-stringify-safe": "^5.0.1", "semver-compare": "^1.0.0", "sprintf-js": "^1.1.2" } }, "sha512-CHhPh+UNHD2GTXNYhPWLnU8ONHdI+5DI+4EYIAOaiD63rHeYlZvyh8P+in5999TTSFgUYuKUAjzRI4mdh/p+2A=="],
 
     "rollup": ["[email protected]", "", { "dependencies": { "@types/estree": "1.0.8" }, "optionalDependencies": { "@rollup/rollup-android-arm-eabi": "4.60.1", "@rollup/rollup-android-arm64": "4.60.1", "@rollup/rollup-darwin-arm64": "4.60.1", "@rollup/rollup-darwin-x64": "4.60.1", "@rollup/rollup-freebsd-arm64": "4.60.1", "@rollup/rollup-freebsd-x64": "4.60.1", "@rollup/rollup-linux-arm-gnueabihf": "4.60.1", "@rollup/rollup-linux-arm-musleabihf": "4.60.1", "@rollup/rollup-linux-arm64-gnu": "4.60.1", "@rollup/rollup-linux-arm64-musl": "4.60.1", "@rollup/rollup-linux-loong64-gnu": "4.60.1", "@rollup/rollup-linux-loong64-musl": "4.60.1", "@rollup/rollup-linux-ppc64-gnu": "4.60.1", "@rollup/rollup-linux-ppc64-musl": "4.60.1", "@rollup/rollup-linux-riscv64-gnu": "4.60.1", "@rollup/rollup-linux-riscv64-musl": "4.60.1", "@rollup/rollup-linux-s390x-gnu": "4.60.1", "@rollup/rollup-linux-x64-gnu": "4.60.1", "@rollup/rollup-linux-x64-musl": "4.60.1", "@rollup/rollup-openbsd-x64": "4.60.1", "@rollup/rollup-openharmony-arm64": "4.60.1", "@rollup/rollup-win32-arm64-msvc": "4.60.1", "@rollup/rollup-win32-ia32-msvc": "4.60.1", "@rollup/rollup-win32-x64-gnu": "4.60.1", "@rollup/rollup-win32-x64-msvc": "4.60.1", "fsevents": "~2.3.2" }, "bin": { "rollup": "dist/bin/rollup" } }, "sha512-VmtB2rFU/GroZ4oL8+ZqXgSA38O6GR8KSIvWmEFv63pQ0G6KaBH9s07PO8XTXP4vI+3UJUEypOfjkGfmSBBR0w=="],

+ 0 - 1
packages/opencode/package.json

@@ -161,7 +161,6 @@
     "opentui-spinner": "0.0.6",
     "partial-json": "0.1.7",
     "remeda": "catalog:",
-    "ripgrep": "0.3.1",
     "semver": "^7.6.3",
     "solid-js": "catalog:",
     "strip-ansi": "7.1.2",

+ 1 - 9
packages/opencode/script/build.ts

@@ -187,7 +187,6 @@ for (const item of targets) {
   const rootPath = path.resolve(dir, "../../node_modules/@opentui/core/parser.worker.js")
   const parserWorker = fs.realpathSync(fs.existsSync(localPath) ? localPath : rootPath)
   const workerPath = "./src/cli/cmd/tui/worker.ts"
-  const rgPath = "./src/file/ripgrep.worker.ts"
 
   // Use platform-specific bunfs root path based on target OS
   const bunfsRoot = item.os === "win32" ? "B:/~BUN/root/" : "/$bunfs/root/"
@@ -212,19 +211,12 @@ for (const item of targets) {
       windows: {},
     },
     files: embeddedFileMap ? { "opencode-web-ui.gen.ts": embeddedFileMap } : {},
-    entrypoints: [
-      "./src/index.ts",
-      parserWorker,
-      workerPath,
-      rgPath,
-      ...(embeddedFileMap ? ["opencode-web-ui.gen.ts"] : []),
-    ],
+    entrypoints: ["./src/index.ts", parserWorker, workerPath, ...(embeddedFileMap ? ["opencode-web-ui.gen.ts"] : [])],
     define: {
       OPENCODE_VERSION: `'${Script.version}'`,
       OPENCODE_MIGRATIONS: JSON.stringify(migrations),
       OTUI_TREE_SITTER_WORKER_PATH: bunfsRoot + workerRelativePath,
       OPENCODE_WORKER_PATH: workerPath,
-      OPENCODE_RIPGREP_WORKER_PATH: rgPath,
       OPENCODE_CHANNEL: `'${Script.channel}'`,
       OPENCODE_LIBC: item.os === "linux" ? `'${item.abi ?? "glibc"}'` : "",
     },

+ 281 - 373
packages/opencode/src/file/ripgrep.ts

@@ -1,15 +1,28 @@
-import fs from "fs/promises"
 import path from "path"
-import { fileURLToPath } from "url"
 import z from "zod"
-import { Cause, Context, Effect, Layer, Queue, Stream } from "effect"
-import { ripgrep } from "ripgrep"
-
-import { Filesystem } from "@/util"
+import { AppFileSystem } from "@opencode-ai/shared/filesystem"
+import { Cause, Context, Effect, Fiber, Layer, Queue, Stream } from "effect"
+import type { PlatformError } from "effect/PlatformError"
+import { FetchHttpClient, HttpClient, HttpClientRequest } from "effect/unstable/http"
+import { ChildProcess } from "effect/unstable/process"
+import { ChildProcessSpawner } from "effect/unstable/process/ChildProcessSpawner"
+
+import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
+import { Global } from "@/global"
 import { Log } from "@/util"
+import { which } from "@/util/which"
 
 export namespace Ripgrep {
   const log = Log.create({ service: "ripgrep" })
+  const VERSION = "14.1.1"
+  const PLATFORM = {
+    "arm64-darwin": { platform: "aarch64-apple-darwin", extension: "tar.gz" },
+    "arm64-linux": { platform: "aarch64-unknown-linux-gnu", extension: "tar.gz" },
+    "x64-darwin": { platform: "x86_64-apple-darwin", extension: "tar.gz" },
+    "x64-linux": { platform: "x86_64-unknown-linux-musl", extension: "tar.gz" },
+    "arm64-win32": { platform: "aarch64-pc-windows-msvc", extension: "zip" },
+    "x64-win32": { platform: "x86_64-pc-windows-msvc", extension: "zip" },
+  } as const
 
   const Stats = z.object({
     elapsed: z.object({
@@ -121,64 +134,22 @@ export namespace Ripgrep {
   }
 
   export interface Interface {
-    readonly files: (input: FilesInput) => Stream.Stream<string, Error>
-    readonly tree: (input: TreeInput) => Effect.Effect<string, Error>
-    readonly search: (input: SearchInput) => Effect.Effect<SearchResult, Error>
+    readonly files: (input: FilesInput) => Stream.Stream<string, PlatformError | Error>
+    readonly tree: (input: TreeInput) => Effect.Effect<string, PlatformError | Error>
+    readonly search: (input: SearchInput) => Effect.Effect<SearchResult, PlatformError | Error>
   }
 
   export class Service extends Context.Service<Service, Interface>()("@opencode/Ripgrep") {}
 
-  type Run = { kind: "files" | "search"; cwd: string; args: string[] }
-
-  type WorkerResult = {
-    type: "result"
-    code: number
-    stdout: string
-    stderr: string
-  }
-
-  type WorkerLine = {
-    type: "line"
-    line: string
-  }
-
-  type WorkerDone = {
-    type: "done"
-    code: number
-    stderr: string
-  }
-
-  type WorkerError = {
-    type: "error"
-    error: {
-      message: string
-      name?: string
-      stack?: string
-    }
-  }
-
   function env() {
-    const env = Object.fromEntries(
+    const out = Object.fromEntries(
       Object.entries(process.env).filter((item): item is [string, string] => item[1] !== undefined),
     )
-    delete env.RIPGREP_CONFIG_PATH
-    return env
-  }
-
-  function text(input: unknown) {
-    if (typeof input === "string") return input
-    if (input instanceof ArrayBuffer) return Buffer.from(input).toString()
-    if (ArrayBuffer.isView(input)) return Buffer.from(input.buffer, input.byteOffset, input.byteLength).toString()
-    return String(input)
-  }
-
-  function toError(input: unknown) {
-    if (input instanceof Error) return input
-    if (typeof input === "string") return new Error(input)
-    return new Error(String(input))
+    delete out.RIPGREP_CONFIG_PATH
+    return out
   }
 
-  function abort(signal?: AbortSignal) {
+  function aborted(signal?: AbortSignal) {
     const err = signal?.reason
     if (err instanceof Error) return err
     const out = new Error("Aborted")
@@ -186,6 +157,16 @@ export namespace Ripgrep {
     return out
   }
 
+  function waitForAbort(signal?: AbortSignal) {
+    if (!signal) return Effect.never
+    if (signal.aborted) return Effect.fail(aborted(signal))
+    return Effect.callback<never, Error>((resume) => {
+      const onabort = () => resume(Effect.fail(aborted(signal)))
+      signal.addEventListener("abort", onabort, { once: true })
+      return Effect.sync(() => signal.removeEventListener("abort", onabort))
+    })
+  }
+
   function error(stderr: string, code: number) {
     const err = new Error(stderr.trim() || `ripgrep failed with code ${code}`)
     err.name = "RipgrepError"
@@ -206,370 +187,297 @@ export namespace Ripgrep {
     }
   }
 
-  function opts(cwd: string) {
-    return {
-      env: env(),
-      preopens: { ".": cwd },
-    }
+  function parse(line: string) {
+    return Effect.try({
+      try: () => Result.parse(JSON.parse(line)),
+      catch: (cause) => new Error("invalid ripgrep output", { cause }),
+    })
   }
 
-  function check(cwd: string) {
-    return Effect.tryPromise({
-      try: () => fs.stat(cwd).catch(() => undefined),
-      catch: toError,
-    }).pipe(
-      Effect.flatMap((stat) =>
-        stat?.isDirectory()
-          ? Effect.void
-          : Effect.fail(
-              Object.assign(new Error(`No such file or directory: '${cwd}'`), {
-                code: "ENOENT",
-                errno: -2,
-                path: cwd,
-              }),
-            ),
-      ),
-    )
+  function fail(queue: Queue.Queue<string, PlatformError | Error | Cause.Done>, err: PlatformError | Error) {
+    Queue.failCauseUnsafe(queue, Cause.fail(err))
   }
 
   function filesArgs(input: FilesInput) {
-    const args = ["--files", "--glob=!.git/*"]
+    const args = ["--no-config", "--files", "--glob=!.git/*"]
     if (input.follow) args.push("--follow")
     if (input.hidden !== false) args.push("--hidden")
+    if (input.hidden === false) args.push("--glob=!.*")
     if (input.maxDepth !== undefined) args.push(`--max-depth=${input.maxDepth}`)
     if (input.glob) {
-      for (const glob of input.glob) {
-        args.push(`--glob=${glob}`)
-      }
+      for (const glob of input.glob) args.push(`--glob=${glob}`)
     }
     args.push(".")
     return args
   }
 
   function searchArgs(input: SearchInput) {
-    const args = ["--json", "--hidden", "--glob=!.git/*", "--no-messages"]
+    const args = ["--no-config", "--json", "--hidden", "--glob=!.git/*", "--no-messages"]
     if (input.follow) args.push("--follow")
     if (input.glob) {
-      for (const glob of input.glob) {
-        args.push(`--glob=${glob}`)
-      }
+      for (const glob of input.glob) args.push(`--glob=${glob}`)
     }
     if (input.limit) args.push(`--max-count=${input.limit}`)
     args.push("--", input.pattern, ...(input.file ?? ["."]))
     return args
   }
 
-  function parse(stdout: string) {
-    return stdout
-      .trim()
-      .split(/\r?\n/)
-      .filter(Boolean)
-      .map((line) => Result.parse(JSON.parse(line)))
-      .flatMap((item) => (item.type === "match" ? [row(item.data)] : []))
+  function raceAbort<A, E, R>(effect: Effect.Effect<A, E, R>, signal?: AbortSignal) {
+    return signal ? effect.pipe(Effect.raceFirst(waitForAbort(signal))) : effect
   }
 
-  declare const OPENCODE_RIPGREP_WORKER_PATH: string
+  export const layer: Layer.Layer<Service, never, AppFileSystem.Service | ChildProcessSpawner | HttpClient.HttpClient> =
+    Layer.effect(
+      Service,
+      Effect.gen(function* () {
+        const fs = yield* AppFileSystem.Service
+        const http = HttpClient.filterStatusOk(yield* HttpClient.HttpClient)
+        const spawner = yield* ChildProcessSpawner
+
+        const run = Effect.fnUntraced(function* (command: string, args: string[], opts?: { cwd?: string }) {
+          const handle = yield* spawner.spawn(
+            ChildProcess.make(command, args, { cwd: opts?.cwd, extendEnv: true, stdin: "ignore" }),
+          )
+          const [stdout, stderr, code] = yield* Effect.all(
+            [
+              Stream.mkString(Stream.decodeText(handle.stdout)),
+              Stream.mkString(Stream.decodeText(handle.stderr)),
+              handle.exitCode,
+            ],
+            { concurrency: "unbounded" },
+          )
+          return { stdout, stderr, code }
+        }, Effect.scoped)
+
+        const extract = Effect.fnUntraced(function* (
+          archive: string,
+          config: (typeof PLATFORM)[keyof typeof PLATFORM],
+        ) {
+          const dir = yield* fs.makeTempDirectoryScoped({ directory: Global.Path.bin, prefix: "ripgrep-" })
+
+          if (config.extension === "zip") {
+            const shell = (yield* Effect.sync(() => which("powershell.exe") ?? which("pwsh.exe"))) ?? "powershell.exe"
+            const result = yield* run(shell, [
+              "-NoProfile",
+              "-Command",
+              "Expand-Archive -LiteralPath $args[0] -DestinationPath $args[1] -Force",
+              archive,
+              dir,
+            ])
+            if (result.code !== 0) {
+              return yield* Effect.fail(error(result.stderr || result.stdout, result.code))
+            }
+          }
 
-  function target(): Effect.Effect<string | URL, Error> {
-    if (typeof OPENCODE_RIPGREP_WORKER_PATH !== "undefined") {
-      return Effect.succeed(OPENCODE_RIPGREP_WORKER_PATH)
-    }
-    const js = new URL("./ripgrep.worker.js", import.meta.url)
-    return Effect.tryPromise({
-      try: () => Filesystem.exists(fileURLToPath(js)),
-      catch: toError,
-    }).pipe(Effect.map((exists) => (exists ? js : new URL("./ripgrep.worker.ts", import.meta.url))))
-  }
+          if (config.extension === "tar.gz") {
+            const result = yield* run("tar", ["-xzf", archive, "-C", dir])
+            if (result.code !== 0) {
+              return yield* Effect.fail(error(result.stderr || result.stdout, result.code))
+            }
+          }
 
-  function worker() {
-    return target().pipe(Effect.flatMap((file) => Effect.sync(() => new Worker(file, { env: env() }))))
-  }
+          return path.join(dir, `ripgrep-${VERSION}-${config.platform}`, process.platform === "win32" ? "rg.exe" : "rg")
+        }, Effect.scoped)
 
-  function drain(buf: string, chunk: unknown, push: (line: string) => void) {
-    const lines = (buf + text(chunk)).split(/\r?\n/)
-    buf = lines.pop() || ""
-    for (const line of lines) {
-      if (line) push(line)
-    }
-    return buf
-  }
+        const filepath = yield* Effect.cached(
+          Effect.gen(function* () {
+            const system = yield* Effect.sync(() => which("rg"))
+            if (system && (yield* fs.isFile(system).pipe(Effect.orDie))) return system
 
-  function fail(queue: Queue.Queue<string, Error | Cause.Done>, err: Error) {
-    Queue.failCauseUnsafe(queue, Cause.fail(err))
-  }
+            const target = path.join(Global.Path.bin, `rg${process.platform === "win32" ? ".exe" : ""}`)
+            if (yield* fs.isFile(target).pipe(Effect.orDie)) return target
 
-  function searchDirect(input: SearchInput) {
-    return Effect.tryPromise({
-      try: () =>
-        ripgrep(searchArgs(input), {
-          buffer: true,
-          ...opts(input.cwd),
-        }),
-      catch: toError,
-    }).pipe(
-      Effect.flatMap((ret) => {
-        const out = ret.stdout ?? ""
-        if (ret.code !== 0 && ret.code !== 1 && ret.code !== 2) {
-          return Effect.fail(error(ret.stderr ?? "", ret.code ?? 1))
-        }
-        return Effect.sync(() => ({
-          items: ret.code === 1 ? [] : parse(out),
-          partial: ret.code === 2,
-        }))
-      }),
-    )
-  }
+            const platformKey = `${process.arch}-${process.platform}` as keyof typeof PLATFORM
+            const config = PLATFORM[platformKey]
+            if (!config) {
+              return yield* Effect.fail(new Error(`unsupported platform for ripgrep: ${platformKey}`))
+            }
 
-  function searchWorker(input: SearchInput) {
-    if (input.signal?.aborted) return Effect.fail(abort(input.signal))
-
-    return Effect.acquireUseRelease(
-      worker(),
-      (w) =>
-        Effect.callback<SearchResult, Error>((resume, signal) => {
-          let open = true
-          const done = (effect: Effect.Effect<SearchResult, Error>) => {
-            if (!open) return
-            open = false
-            resume(effect)
-          }
-          const onabort = () => done(Effect.fail(abort(input.signal)))
+            const filename = `ripgrep-${VERSION}-${config.platform}.${config.extension}`
+            const url = `https://github.com/BurntSushi/ripgrep/releases/download/${VERSION}/${filename}`
+            const archive = path.join(Global.Path.bin, filename)
 
-          w.onerror = (evt) => {
-            done(Effect.fail(toError(evt.error ?? evt.message)))
-          }
-          w.onmessage = (evt: MessageEvent<WorkerResult | WorkerError>) => {
-            const msg = evt.data
-            if (msg.type === "error") {
-              done(Effect.fail(Object.assign(new Error(msg.error.message), msg.error)))
-              return
+            log.info("downloading ripgrep", { url })
+            yield* fs.ensureDir(Global.Path.bin).pipe(Effect.orDie)
+
+            const bytes = yield* HttpClientRequest.get(url).pipe(
+              http.execute,
+              Effect.flatMap((response) => response.arrayBuffer),
+              Effect.mapError((cause) => (cause instanceof Error ? cause : new Error(String(cause)))),
+            )
+            if (bytes.byteLength === 0) {
+              return yield* Effect.fail(new Error(`failed to download ripgrep from ${url}`))
             }
-            if (msg.code === 1) {
-              done(Effect.succeed({ items: [], partial: false }))
-              return
+
+            yield* fs.writeWithDirs(archive, new Uint8Array(bytes)).pipe(Effect.orDie)
+            const extracted = yield* extract(archive, config)
+            const exists = yield* fs.exists(extracted).pipe(Effect.orDie)
+            if (!exists) {
+              return yield* Effect.fail(new Error(`ripgrep archive did not contain executable: ${extracted}`))
             }
-            if (msg.code !== 0 && msg.code !== 1 && msg.code !== 2) {
-              done(Effect.fail(error(msg.stderr, msg.code)))
-              return
+
+            yield* fs.copyFile(extracted, target).pipe(Effect.orDie)
+            if (process.platform !== "win32") {
+              yield* fs.chmod(target, 0o755).pipe(Effect.orDie)
             }
-            done(
-              Effect.sync(() => ({
-                items: parse(msg.stdout),
-                partial: msg.code === 2,
-              })),
-            )
-          }
+            yield* fs.remove(archive, { force: true }).pipe(Effect.ignore)
+            return target
+          }),
+        )
 
-          input.signal?.addEventListener("abort", onabort, { once: true })
-          signal.addEventListener("abort", onabort, { once: true })
-          w.postMessage({
-            kind: "search",
-            cwd: input.cwd,
-            args: searchArgs(input),
-          } satisfies Run)
-
-          return Effect.sync(() => {
-            input.signal?.removeEventListener("abort", onabort)
-            signal.removeEventListener("abort", onabort)
-            w.onerror = null
-            w.onmessage = null
+        const check = Effect.fnUntraced(function* (cwd: string) {
+          if (yield* fs.isDir(cwd).pipe(Effect.orDie)) return
+          return yield* Effect.fail(
+            Object.assign(new Error(`No such file or directory: '${cwd}'`), {
+              code: "ENOENT",
+              errno: -2,
+              path: cwd,
+            }),
+          )
+        })
+
+        const command = Effect.fnUntraced(function* (cwd: string, args: string[]) {
+          const binary = yield* filepath
+          return ChildProcess.make(binary, args, {
+            cwd,
+            env: env(),
+            extendEnv: true,
+            stdin: "ignore",
           })
-        }),
-      (w) => Effect.sync(() => w.terminate()),
-    )
-  }
+        })
+
+        const files: Interface["files"] = (input) =>
+          Stream.callback<string, PlatformError | Error>((queue) =>
+            Effect.gen(function* () {
+              yield* Effect.forkScoped(
+                Effect.gen(function* () {
+                  yield* check(input.cwd)
+                  const handle = yield* spawner.spawn(yield* command(input.cwd, filesArgs(input)))
+                  const stderr = yield* Stream.mkString(Stream.decodeText(handle.stderr)).pipe(Effect.forkScoped)
+                  const stdout = yield* Stream.decodeText(handle.stdout).pipe(
+                    Stream.splitLines,
+                    Stream.filter((line) => line.length > 0),
+                    Stream.runForEach((line) => Effect.sync(() => Queue.offerUnsafe(queue, clean(line)))),
+                    Effect.forkScoped,
+                  )
+                  const code = yield* raceAbort(handle.exitCode, input.signal)
+                  yield* Fiber.join(stdout)
+                  if (code === 0 || code === 1) {
+                    Queue.endUnsafe(queue)
+                    return
+                  }
+                  fail(queue, error(yield* Fiber.join(stderr), code))
+                }).pipe(
+                  Effect.catch((err) =>
+                    Effect.sync(() => {
+                      fail(queue, err)
+                    }),
+                  ),
+                ),
+              )
+            }),
+          )
 
-  function filesDirect(input: FilesInput) {
-    return Stream.callback<string, Error>(
-      Effect.fnUntraced(function* (queue: Queue.Queue<string, Error | Cause.Done>) {
-        let buf = ""
-        let err = ""
-
-        const out = {
-          write(chunk: unknown) {
-            buf = drain(buf, chunk, (line) => {
-              Queue.offerUnsafe(queue, clean(line))
-            })
-          },
-        }
-
-        const stderr = {
-          write(chunk: unknown) {
-            err += text(chunk)
-          },
-        }
-
-        yield* Effect.forkScoped(
-          Effect.gen(function* () {
-            yield* check(input.cwd)
-            const ret = yield* Effect.tryPromise({
-              try: () =>
-                ripgrep(filesArgs(input), {
-                  stdout: out,
-                  stderr,
-                  ...opts(input.cwd),
-                }),
-              catch: toError,
-            })
-            if (buf) Queue.offerUnsafe(queue, clean(buf))
-            if (ret.code === 0 || ret.code === 1) {
-              Queue.endUnsafe(queue)
-              return
-            }
-            fail(queue, error(err, ret.code ?? 1))
-          }).pipe(
-            Effect.catch((err) =>
-              Effect.sync(() => {
-                fail(queue, err)
-              }),
-            ),
-          ),
-        )
-      }),
-    )
-  }
+        const search: Interface["search"] = Effect.fn("Ripgrep.search")(function* (input: SearchInput) {
+          yield* check(input.cwd)
+
+          const program = Effect.scoped(
+            Effect.gen(function* () {
+              const handle = yield* spawner.spawn(yield* command(input.cwd, searchArgs(input)))
+
+              const [items, stderr, code] = yield* Effect.all(
+                [
+                  Stream.decodeText(handle.stdout).pipe(
+                    Stream.splitLines,
+                    Stream.filter((line) => line.length > 0),
+                    Stream.mapEffect(parse),
+                    Stream.filter((item): item is Match => item.type === "match"),
+                    Stream.map((item) => row(item.data)),
+                    Stream.runCollect,
+                    Effect.map((chunk) => [...chunk]),
+                  ),
+                  Stream.mkString(Stream.decodeText(handle.stderr)),
+                  handle.exitCode,
+                ],
+                { concurrency: "unbounded" },
+              )
+
+              if (code !== 0 && code !== 1 && code !== 2) {
+                return yield* Effect.fail(error(stderr, code))
+              }
+
+              return {
+                items: code === 1 ? [] : items,
+                partial: code === 2,
+              }
+            }),
+          )
+
+          return yield* raceAbort(program, input.signal)
+        })
 
-  function filesWorker(input: FilesInput) {
-    return Stream.callback<string, Error>(
-      Effect.fnUntraced(function* (queue: Queue.Queue<string, Error | Cause.Done>) {
-        if (input.signal?.aborted) {
-          fail(queue, abort(input.signal))
-          return
-        }
-
-        const w = yield* Effect.acquireRelease(worker(), (w) => Effect.sync(() => w.terminate()))
-        let open = true
-        const close = () => {
-          if (!open) return false
-          open = false
-          return true
-        }
-        const onabort = () => {
-          if (!close()) return
-          fail(queue, abort(input.signal))
-        }
-
-        w.onerror = (evt) => {
-          if (!close()) return
-          fail(queue, toError(evt.error ?? evt.message))
-        }
-        w.onmessage = (evt: MessageEvent<WorkerLine | WorkerDone | WorkerError>) => {
-          const msg = evt.data
-          if (msg.type === "line") {
-            if (open) Queue.offerUnsafe(queue, msg.line)
-            return
+        const tree: Interface["tree"] = Effect.fn("Ripgrep.tree")(function* (input: TreeInput) {
+          log.info("tree", input)
+          const list = Array.from(yield* files({ cwd: input.cwd, signal: input.signal }).pipe(Stream.runCollect))
+
+          interface Node {
+            name: string
+            children: Map<string, Node>
           }
-          if (!close()) return
-          if (msg.type === "error") {
-            fail(queue, Object.assign(new Error(msg.error.message), msg.error))
-            return
+
+          function child(node: Node, name: string) {
+            const item = node.children.get(name)
+            if (item) return item
+            const next = { name, children: new Map() }
+            node.children.set(name, next)
+            return next
           }
-          if (msg.code === 0 || msg.code === 1) {
-            Queue.endUnsafe(queue)
-            return
+
+          function count(node: Node): number {
+            return Array.from(node.children.values()).reduce((sum, child) => sum + 1 + count(child), 0)
           }
-          fail(queue, error(msg.stderr, msg.code))
-        }
-
-        yield* Effect.acquireRelease(
-          Effect.sync(() => {
-            input.signal?.addEventListener("abort", onabort, { once: true })
-            w.postMessage({
-              kind: "files",
-              cwd: input.cwd,
-              args: filesArgs(input),
-            } satisfies Run)
-          }),
-          () =>
-            Effect.sync(() => {
-              input.signal?.removeEventListener("abort", onabort)
-              w.onerror = null
-              w.onmessage = null
-            }),
-        )
-      }),
-    )
-  }
 
-  export const layer = Layer.effect(
-    Service,
-    Effect.gen(function* () {
-      const source = (input: FilesInput) => {
-        const useWorker = !!input.signal && typeof Worker !== "undefined"
-        if (!useWorker && input.signal) {
-          log.warn("worker unavailable, ripgrep abort disabled")
-        }
-        return useWorker ? filesWorker(input) : filesDirect(input)
-      }
-
-      const files: Interface["files"] = (input) => source(input)
-
-      const tree: Interface["tree"] = Effect.fn("Ripgrep.tree")(function* (input: TreeInput) {
-        log.info("tree", input)
-        const list = Array.from(yield* source({ cwd: input.cwd, signal: input.signal }).pipe(Stream.runCollect))
-
-        interface Node {
-          name: string
-          children: Map<string, Node>
-        }
-
-        function child(node: Node, name: string) {
-          const item = node.children.get(name)
-          if (item) return item
-          const next = { name, children: new Map() }
-          node.children.set(name, next)
-          return next
-        }
-
-        function count(node: Node): number {
-          return Array.from(node.children.values()).reduce((sum, child) => sum + 1 + count(child), 0)
-        }
-
-        const root: Node = { name: "", children: new Map() }
-        for (const file of list) {
-          if (file.includes(".opencode")) continue
-          const parts = file.split(path.sep)
-          if (parts.length < 2) continue
-          let node = root
-          for (const part of parts.slice(0, -1)) {
-            node = child(node, part)
+          const root: Node = { name: "", children: new Map() }
+          for (const file of list) {
+            if (file.includes(".opencode")) continue
+            const parts = file.split(path.sep)
+            if (parts.length < 2) continue
+            let node = root
+            for (const part of parts.slice(0, -1)) {
+              node = child(node, part)
+            }
+          }
+
+          const total = count(root)
+          const limit = input.limit ?? total
+          const lines: string[] = []
+          const queue: Array<{ node: Node; path: string }> = Array.from(root.children.values())
+            .sort((a, b) => a.name.localeCompare(b.name))
+            .map((node) => ({ node, path: node.name }))
+
+          let used = 0
+          for (let i = 0; i < queue.length && used < limit; i++) {
+            const item = queue[i]
+            lines.push(item.path)
+            used++
+            queue.push(
+              ...Array.from(item.node.children.values())
+                .sort((a, b) => a.name.localeCompare(b.name))
+                .map((node) => ({ node, path: `${item.path}/${node.name}` })),
+            )
           }
-        }
-
-        const total = count(root)
-        const limit = input.limit ?? total
-        const lines: string[] = []
-        const queue: Array<{ node: Node; path: string }> = Array.from(root.children.values())
-          .sort((a, b) => a.name.localeCompare(b.name))
-          .map((node) => ({ node, path: node.name }))
-
-        let used = 0
-        for (let i = 0; i < queue.length && used < limit; i++) {
-          const item = queue[i]
-          lines.push(item.path)
-          used++
-          queue.push(
-            ...Array.from(item.node.children.values())
-              .sort((a, b) => a.name.localeCompare(b.name))
-              .map((node) => ({ node, path: `${item.path}/${node.name}` })),
-          )
-        }
 
-        if (total > used) lines.push(`[${total - used} truncated]`)
-        return lines.join("\n")
-      })
+          if (total > used) lines.push(`[${total - used} truncated]`)
+          return lines.join("\n")
+        })
 
-      const search: Interface["search"] = Effect.fn("Ripgrep.search")(function* (input: SearchInput) {
-        const useWorker = !!input.signal && typeof Worker !== "undefined"
-        if (!useWorker && input.signal) {
-          log.warn("worker unavailable, ripgrep abort disabled")
-        }
-        return yield* useWorker ? searchWorker(input) : searchDirect(input)
-      })
+        return Service.of({ files, tree, search })
+      }),
+    )
 
-      return Service.of({ files, tree, search })
-    }),
+  export const defaultLayer = layer.pipe(
+    Layer.provide(FetchHttpClient.layer),
+    Layer.provide(AppFileSystem.defaultLayer),
+    Layer.provide(CrossSpawnSpawner.defaultLayer),
   )
-
-  export const defaultLayer = layer
 }

+ 0 - 103
packages/opencode/src/file/ripgrep.worker.ts

@@ -1,103 +0,0 @@
-import { ripgrep } from "ripgrep"
-
-function env() {
-  const env = Object.fromEntries(
-    Object.entries(process.env).filter((item): item is [string, string] => item[1] !== undefined),
-  )
-  delete env.RIPGREP_CONFIG_PATH
-  return env
-}
-
-function opts(cwd: string) {
-  return {
-    env: env(),
-    preopens: { ".": cwd },
-  }
-}
-
-type Run = {
-  kind: "files" | "search"
-  cwd: string
-  args: string[]
-}
-
-function text(input: unknown) {
-  if (typeof input === "string") return input
-  if (input instanceof ArrayBuffer) return Buffer.from(input).toString()
-  if (ArrayBuffer.isView(input)) return Buffer.from(input.buffer, input.byteOffset, input.byteLength).toString()
-  return String(input)
-}
-
-function error(input: unknown) {
-  if (input instanceof Error) {
-    return {
-      message: input.message,
-      name: input.name,
-      stack: input.stack,
-    }
-  }
-
-  return {
-    message: String(input),
-  }
-}
-
-function clean(file: string) {
-  return file.replace(/^\.[\\/]/, "")
-}
-
-onmessage = async (evt: MessageEvent<Run>) => {
-  const msg = evt.data
-
-  try {
-    if (msg.kind === "search") {
-      const ret = await ripgrep(msg.args, {
-        buffer: true,
-        ...opts(msg.cwd),
-      })
-      postMessage({
-        type: "result",
-        code: ret.code ?? 0,
-        stdout: ret.stdout ?? "",
-        stderr: ret.stderr ?? "",
-      })
-      return
-    }
-
-    let buf = ""
-    let err = ""
-    const out = {
-      write(chunk: unknown) {
-        buf += text(chunk)
-        const lines = buf.split(/\r?\n/)
-        buf = lines.pop() || ""
-        for (const line of lines) {
-          if (line) postMessage({ type: "line", line: clean(line) })
-        }
-      },
-    }
-    const stderr = {
-      write(chunk: unknown) {
-        err += text(chunk)
-      },
-    }
-
-    const ret = await ripgrep(msg.args, {
-      stdout: out,
-      stderr,
-      ...opts(msg.cwd),
-    })
-
-    if (buf) postMessage({ type: "line", line: clean(buf) })
-    postMessage({
-      type: "done",
-      code: ret.code ?? 0,
-      stderr: err,
-    })
-  } catch (err) {
-    postMessage({
-      type: "error",
-      error: error(err),
-    })
-  }
-}