소스 검색

fix(ripgrep): restore native rg backend (#22773)

Co-authored-by: LukeParkerDev <[email protected]>
Shoubhit Dash 18 시간 전
부모
커밋
889087c966
6개의 변경된 파일295개의 추가작업 그리고 503개의 파일을 삭제
  1. 0 3
      bun.lock
  2. 19 18
      nix/opencode.nix
  3. 0 1
      packages/opencode/package.json
  4. 1 9
      packages/opencode/script/build.ts
  5. 275 370
      packages/opencode/src/file/ripgrep.ts
  6. 0 102
      packages/opencode/src/file/ripgrep.worker.ts

+ 0 - 3
bun.lock

@@ -404,7 +404,6 @@
         "opentui-spinner": "0.0.6",
         "partial-json": "0.1.7",
         "remeda": "catalog:",
-        "ripgrep": "0.3.1",
         "semver": "^7.6.3",
         "solid-js": "catalog:",
         "strip-ansi": "7.1.2",
@@ -4482,8 +4481,6 @@
 
     "rimraf": ["[email protected]", "", { "dependencies": { "glob": "^7.1.3" }, "bin": { "rimraf": "./bin.js" } }, "sha512-mwqeW5XsA2qAejG46gYdENaxXjx9onRNCfn7L0duuP4hCuTIi/QO7PDK07KJfp1d+izWPrzEJDcSqBa0OZQriA=="],
 
-    "ripgrep": ["[email protected]", "", { "bin": { "rg": "lib/rg.mjs", "ripgrep": "lib/rg.mjs" } }, "sha512-6bDtNIBh1qPviVIU685/4uv0Ap5t8eS4wiJhy/tR2LdIeIey9CVasENlGS+ul3HnTmGANIp7AjnfsztsRmALfQ=="],
-
     "roarr": ["[email protected]", "", { "dependencies": { "boolean": "^3.0.1", "detect-node": "^2.0.4", "globalthis": "^1.0.1", "json-stringify-safe": "^5.0.1", "semver-compare": "^1.0.0", "sprintf-js": "^1.1.2" } }, "sha512-CHhPh+UNHD2GTXNYhPWLnU8ONHdI+5DI+4EYIAOaiD63rHeYlZvyh8P+in5999TTSFgUYuKUAjzRI4mdh/p+2A=="],
 
     "rollup": ["[email protected]", "", { "dependencies": { "@types/estree": "1.0.8" }, "optionalDependencies": { "@rollup/rollup-android-arm-eabi": "4.60.1", "@rollup/rollup-android-arm64": "4.60.1", "@rollup/rollup-darwin-arm64": "4.60.1", "@rollup/rollup-darwin-x64": "4.60.1", "@rollup/rollup-freebsd-arm64": "4.60.1", "@rollup/rollup-freebsd-x64": "4.60.1", "@rollup/rollup-linux-arm-gnueabihf": "4.60.1", "@rollup/rollup-linux-arm-musleabihf": "4.60.1", "@rollup/rollup-linux-arm64-gnu": "4.60.1", "@rollup/rollup-linux-arm64-musl": "4.60.1", "@rollup/rollup-linux-loong64-gnu": "4.60.1", "@rollup/rollup-linux-loong64-musl": "4.60.1", "@rollup/rollup-linux-ppc64-gnu": "4.60.1", "@rollup/rollup-linux-ppc64-musl": "4.60.1", "@rollup/rollup-linux-riscv64-gnu": "4.60.1", "@rollup/rollup-linux-riscv64-musl": "4.60.1", "@rollup/rollup-linux-s390x-gnu": "4.60.1", "@rollup/rollup-linux-x64-gnu": "4.60.1", "@rollup/rollup-linux-x64-musl": "4.60.1", "@rollup/rollup-openbsd-x64": "4.60.1", "@rollup/rollup-openharmony-arm64": "4.60.1", "@rollup/rollup-win32-arm64-msvc": "4.60.1", "@rollup/rollup-win32-ia32-msvc": "4.60.1", "@rollup/rollup-win32-x64-gnu": "4.60.1", "@rollup/rollup-win32-x64-msvc": "4.60.1", "fsevents": "~2.3.2" }, "bin": { "rollup": "dist/bin/rollup" } }, "sha512-VmtB2rFU/GroZ4oL8+ZqXgSA38O6GR8KSIvWmEFv63pQ0G6KaBH9s07PO8XTXP4vI+3UJUEypOfjkGfmSBBR0w=="],

+ 19 - 18
nix/opencode.nix

@@ -7,6 +7,7 @@
   sysctl,
   makeBinaryWrapper,
   models-dev,
+  ripgrep,
   installShellFiles,
   versionCheckHook,
   writableTmpDirAsHomeHook,
@@ -51,25 +52,25 @@ stdenvNoCC.mkDerivation (finalAttrs: {
     runHook postBuild
   '';
 
-  installPhase =
-    ''
-      runHook preInstall
-
-      install -Dm755 dist/opencode-*/bin/opencode $out/bin/opencode
-      install -Dm644 schema.json $out/share/opencode/schema.json
-    ''
-    # bun runs sysctl to detect if dunning on rosetta2
-    + lib.optionalString stdenvNoCC.hostPlatform.isDarwin ''
-      wrapProgram $out/bin/opencode \
-        --prefix PATH : ${
-          lib.makeBinPath [
-            sysctl
+  installPhase = ''
+    runHook preInstall
+
+    install -Dm755 dist/opencode-*/bin/opencode $out/bin/opencode
+    install -Dm644 schema.json $out/share/opencode/schema.json
+
+    wrapProgram $out/bin/opencode \
+      --prefix PATH : ${
+        lib.makeBinPath (
+          [
+            ripgrep
           ]
-        }
-    ''
-    + ''
-      runHook postInstall
-    '';
+          # bun runs sysctl to detect if dunning on rosetta2
+          ++ lib.optional stdenvNoCC.hostPlatform.isDarwin sysctl
+        )
+      }
+
+    runHook postInstall
+  '';
 
   postInstall = lib.optionalString (stdenvNoCC.buildPlatform.canExecute stdenvNoCC.hostPlatform) ''
     # trick yargs into also generating zsh completions

+ 0 - 1
packages/opencode/package.json

@@ -161,7 +161,6 @@
     "opentui-spinner": "0.0.6",
     "partial-json": "0.1.7",
     "remeda": "catalog:",
-    "ripgrep": "0.3.1",
     "semver": "^7.6.3",
     "solid-js": "catalog:",
     "strip-ansi": "7.1.2",

+ 1 - 9
packages/opencode/script/build.ts

@@ -187,7 +187,6 @@ for (const item of targets) {
   const rootPath = path.resolve(dir, "../../node_modules/@opentui/core/parser.worker.js")
   const parserWorker = fs.realpathSync(fs.existsSync(localPath) ? localPath : rootPath)
   const workerPath = "./src/cli/cmd/tui/worker.ts"
-  const rgPath = "./src/file/ripgrep.worker.ts"
 
   // Use platform-specific bunfs root path based on target OS
   const bunfsRoot = item.os === "win32" ? "B:/~BUN/root/" : "/$bunfs/root/"
@@ -212,19 +211,12 @@ for (const item of targets) {
       windows: {},
     },
     files: embeddedFileMap ? { "opencode-web-ui.gen.ts": embeddedFileMap } : {},
-    entrypoints: [
-      "./src/index.ts",
-      parserWorker,
-      workerPath,
-      rgPath,
-      ...(embeddedFileMap ? ["opencode-web-ui.gen.ts"] : []),
-    ],
+    entrypoints: ["./src/index.ts", parserWorker, workerPath, ...(embeddedFileMap ? ["opencode-web-ui.gen.ts"] : [])],
     define: {
       OPENCODE_VERSION: `'${Script.version}'`,
       OPENCODE_MIGRATIONS: JSON.stringify(migrations),
       OTUI_TREE_SITTER_WORKER_PATH: bunfsRoot + workerRelativePath,
       OPENCODE_WORKER_PATH: workerPath,
-      OPENCODE_RIPGREP_WORKER_PATH: rgPath,
       OPENCODE_CHANNEL: `'${Script.channel}'`,
       OPENCODE_LIBC: item.os === "linux" ? `'${item.abi ?? "glibc"}'` : "",
     },

+ 275 - 370
packages/opencode/src/file/ripgrep.ts

@@ -1,15 +1,28 @@
-import fs from "fs/promises"
 import path from "path"
-import { fileURLToPath } from "url"
 import z from "zod"
-import { Cause, Context, Effect, Layer, Queue, Stream } from "effect"
-import { ripgrep } from "ripgrep"
-
-import { Filesystem } from "@/util"
+import { AppFileSystem } from "@opencode-ai/shared/filesystem"
+import { Cause, Context, Effect, Fiber, Layer, Queue, Stream } from "effect"
+import type { PlatformError } from "effect/PlatformError"
+import { FetchHttpClient, HttpClient, HttpClientRequest } from "effect/unstable/http"
+import { ChildProcess } from "effect/unstable/process"
+import { ChildProcessSpawner } from "effect/unstable/process/ChildProcessSpawner"
+
+import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
+import { Global } from "@/global"
 import { Log } from "@/util"
 import { sanitizedProcessEnv } from "@/util/opencode-process"
+import { which } from "@/util/which"
 
 const log = Log.create({ service: "ripgrep" })
+const VERSION = "14.1.1"
+const PLATFORM = {
+  "arm64-darwin": { platform: "aarch64-apple-darwin", extension: "tar.gz" },
+  "arm64-linux": { platform: "aarch64-unknown-linux-gnu", extension: "tar.gz" },
+  "x64-darwin": { platform: "x86_64-apple-darwin", extension: "tar.gz" },
+  "x64-linux": { platform: "x86_64-unknown-linux-musl", extension: "tar.gz" },
+  "arm64-win32": { platform: "aarch64-pc-windows-msvc", extension: "zip" },
+  "x64-win32": { platform: "x86_64-pc-windows-msvc", extension: "zip" },
+} as const
 
 const Stats = z.object({
   elapsed: z.object({
@@ -121,62 +134,20 @@ export interface TreeInput {
 }
 
 export interface Interface {
-  readonly files: (input: FilesInput) => Stream.Stream<string, Error>
-  readonly tree: (input: TreeInput) => Effect.Effect<string, Error>
-  readonly search: (input: SearchInput) => Effect.Effect<SearchResult, Error>
+  readonly files: (input: FilesInput) => Stream.Stream<string, PlatformError | Error>
+  readonly tree: (input: TreeInput) => Effect.Effect<string, PlatformError | Error>
+  readonly search: (input: SearchInput) => Effect.Effect<SearchResult, PlatformError | Error>
 }
 
 export class Service extends Context.Service<Service, Interface>()("@opencode/Ripgrep") {}
 
-type Run = { kind: "files" | "search"; cwd: string; args: string[] }
-
-type WorkerResult = {
-  type: "result"
-  code: number
-  stdout: string
-  stderr: string
-}
-
-type WorkerLine = {
-  type: "line"
-  line: string
-}
-
-type WorkerDone = {
-  type: "done"
-  code: number
-  stderr: string
-}
-
-type WorkerError = {
-  type: "error"
-  error: {
-    message: string
-    name?: string
-    stack?: string
-  }
-}
-
 function env() {
   const env = sanitizedProcessEnv()
   delete env.RIPGREP_CONFIG_PATH
   return env
 }
 
-function text(input: unknown) {
-  if (typeof input === "string") return input
-  if (input instanceof ArrayBuffer) return Buffer.from(input).toString()
-  if (ArrayBuffer.isView(input)) return Buffer.from(input.buffer, input.byteOffset, input.byteLength).toString()
-  return String(input)
-}
-
-function toError(input: unknown) {
-  if (input instanceof Error) return input
-  if (typeof input === "string") return new Error(input)
-  return new Error(String(input))
-}
-
-function abort(signal?: AbortSignal) {
+function aborted(signal?: AbortSignal) {
   const err = signal?.reason
   if (err instanceof Error) return err
   const out = new Error("Aborted")
@@ -184,6 +155,16 @@ function abort(signal?: AbortSignal) {
   return out
 }
 
+function waitForAbort(signal?: AbortSignal) {
+  if (!signal) return Effect.never
+  if (signal.aborted) return Effect.fail(aborted(signal))
+  return Effect.callback<never, Error>((resume) => {
+    const onabort = () => resume(Effect.fail(aborted(signal)))
+    signal.addEventListener("abort", onabort, { once: true })
+    return Effect.sync(() => signal.removeEventListener("abort", onabort))
+  })
+}
+
 function error(stderr: string, code: number) {
   const err = new Error(stderr.trim() || `ripgrep failed with code ${code}`)
   err.name = "RipgrepError"
@@ -204,371 +185,295 @@ function row(data: Row): Row {
   }
 }
 
-function opts(cwd: string) {
-  return {
-    env: env(),
-    preopens: { ".": cwd },
-  }
+function parse(line: string) {
+  return Effect.try({
+    try: () => Result.parse(JSON.parse(line)),
+    catch: (cause) => new Error("invalid ripgrep output", { cause }),
+  })
 }
 
-function check(cwd: string) {
-  return Effect.tryPromise({
-    try: () => fs.stat(cwd).catch(() => undefined),
-    catch: toError,
-  }).pipe(
-    Effect.flatMap((stat) =>
-      stat?.isDirectory()
-        ? Effect.void
-        : Effect.fail(
-            Object.assign(new Error(`No such file or directory: '${cwd}'`), {
-              code: "ENOENT",
-              errno: -2,
-              path: cwd,
-            }),
-          ),
-    ),
-  )
+function fail(queue: Queue.Queue<string, PlatformError | Error | Cause.Done>, err: PlatformError | Error) {
+  Queue.failCauseUnsafe(queue, Cause.fail(err))
 }
 
 function filesArgs(input: FilesInput) {
-  const args = ["--files", "--glob=!.git/*"]
+  const args = ["--no-config", "--files", "--glob=!.git/*"]
   if (input.follow) args.push("--follow")
   if (input.hidden !== false) args.push("--hidden")
+  if (input.hidden === false) args.push("--glob=!.*")
   if (input.maxDepth !== undefined) args.push(`--max-depth=${input.maxDepth}`)
   if (input.glob) {
-    for (const glob of input.glob) {
-      args.push(`--glob=${glob}`)
-    }
+    for (const glob of input.glob) args.push(`--glob=${glob}`)
   }
   args.push(".")
   return args
 }
 
 function searchArgs(input: SearchInput) {
-  const args = ["--json", "--hidden", "--glob=!.git/*", "--no-messages"]
+  const args = ["--no-config", "--json", "--hidden", "--glob=!.git/*", "--no-messages"]
   if (input.follow) args.push("--follow")
   if (input.glob) {
-    for (const glob of input.glob) {
-      args.push(`--glob=${glob}`)
-    }
+    for (const glob of input.glob) args.push(`--glob=${glob}`)
   }
   if (input.limit) args.push(`--max-count=${input.limit}`)
   args.push("--", input.pattern, ...(input.file ?? ["."]))
   return args
 }
 
-function parse(stdout: string) {
-  return stdout
-    .trim()
-    .split(/\r?\n/)
-    .filter(Boolean)
-    .map((line) => Result.parse(JSON.parse(line)))
-    .flatMap((item) => (item.type === "match" ? [row(item.data)] : []))
+function raceAbort<A, E, R>(effect: Effect.Effect<A, E, R>, signal?: AbortSignal) {
+  return signal ? effect.pipe(Effect.raceFirst(waitForAbort(signal))) : effect
 }
 
-declare const OPENCODE_RIPGREP_WORKER_PATH: string
+export const layer: Layer.Layer<Service, never, AppFileSystem.Service | ChildProcessSpawner | HttpClient.HttpClient> =
+  Layer.effect(
+    Service,
+    Effect.gen(function* () {
+      const fs = yield* AppFileSystem.Service
+      const http = HttpClient.filterStatusOk(yield* HttpClient.HttpClient)
+      const spawner = yield* ChildProcessSpawner
+
+      const run = Effect.fnUntraced(function* (command: string, args: string[], opts?: { cwd?: string }) {
+        const handle = yield* spawner.spawn(
+          ChildProcess.make(command, args, { cwd: opts?.cwd, extendEnv: true, stdin: "ignore" }),
+        )
+        const [stdout, stderr, code] = yield* Effect.all(
+          [
+            Stream.mkString(Stream.decodeText(handle.stdout)),
+            Stream.mkString(Stream.decodeText(handle.stderr)),
+            handle.exitCode,
+          ],
+          { concurrency: "unbounded" },
+        )
+        return { stdout, stderr, code }
+      }, Effect.scoped)
+
+      const extract = Effect.fnUntraced(function* (archive: string, config: (typeof PLATFORM)[keyof typeof PLATFORM]) {
+        const dir = yield* fs.makeTempDirectoryScoped({ directory: Global.Path.bin, prefix: "ripgrep-" })
+
+        if (config.extension === "zip") {
+          const shell = (yield* Effect.sync(() => which("powershell.exe") ?? which("pwsh.exe"))) ?? "powershell.exe"
+          const result = yield* run(shell, [
+            "-NoProfile",
+            "-Command",
+            "Expand-Archive -LiteralPath $args[0] -DestinationPath $args[1] -Force",
+            archive,
+            dir,
+          ])
+          if (result.code !== 0) {
+            return yield* Effect.fail(error(result.stderr || result.stdout, result.code))
+          }
+        }
 
-function target(): Effect.Effect<string | URL, Error> {
-  if (typeof OPENCODE_RIPGREP_WORKER_PATH !== "undefined") {
-    return Effect.succeed(OPENCODE_RIPGREP_WORKER_PATH)
-  }
-  const js = new URL("./ripgrep.worker.js", import.meta.url)
-  return Effect.tryPromise({
-    try: () => Filesystem.exists(fileURLToPath(js)),
-    catch: toError,
-  }).pipe(Effect.map((exists) => (exists ? js : new URL("./ripgrep.worker.ts", import.meta.url))))
-}
+        if (config.extension === "tar.gz") {
+          const result = yield* run("tar", ["-xzf", archive, "-C", dir])
+          if (result.code !== 0) {
+            return yield* Effect.fail(error(result.stderr || result.stdout, result.code))
+          }
+        }
 
-function worker() {
-  return target().pipe(Effect.flatMap((file) => Effect.sync(() => new Worker(file, { env: env() }))))
-}
+        return path.join(dir, `ripgrep-${VERSION}-${config.platform}`, process.platform === "win32" ? "rg.exe" : "rg")
+      }, Effect.scoped)
 
-function drain(buf: string, chunk: unknown, push: (line: string) => void) {
-  const lines = (buf + text(chunk)).split(/\r?\n/)
-  buf = lines.pop() || ""
-  for (const line of lines) {
-    if (line) push(line)
-  }
-  return buf
-}
+      const filepath = yield* Effect.cached(
+        Effect.gen(function* () {
+          const system = yield* Effect.sync(() => which("rg"))
+          if (system && (yield* fs.isFile(system).pipe(Effect.orDie))) return system
 
-function fail(queue: Queue.Queue<string, Error | Cause.Done>, err: Error) {
-  Queue.failCauseUnsafe(queue, Cause.fail(err))
-}
+          const target = path.join(Global.Path.bin, `rg${process.platform === "win32" ? ".exe" : ""}`)
+          if (yield* fs.isFile(target).pipe(Effect.orDie)) return target
 
-function searchDirect(input: SearchInput) {
-  return Effect.tryPromise({
-    try: () =>
-      ripgrep(searchArgs(input), {
-        buffer: true,
-        ...opts(input.cwd),
-      }),
-    catch: toError,
-  }).pipe(
-    Effect.flatMap((ret) => {
-      const out = ret.stdout ?? ""
-      if (ret.code !== 0 && ret.code !== 1 && ret.code !== 2) {
-        return Effect.fail(error(ret.stderr ?? "", ret.code ?? 1))
-      }
-      return Effect.sync(() => ({
-        items: ret.code === 1 ? [] : parse(out),
-        partial: ret.code === 2,
-      }))
-    }),
-  )
-}
+          const platformKey = `${process.arch}-${process.platform}` as keyof typeof PLATFORM
+          const config = PLATFORM[platformKey]
+          if (!config) {
+            return yield* Effect.fail(new Error(`unsupported platform for ripgrep: ${platformKey}`))
+          }
 
-function searchWorker(input: SearchInput) {
-  if (input.signal?.aborted) return Effect.fail(abort(input.signal))
-
-  return Effect.acquireUseRelease(
-    worker(),
-    (w) =>
-      Effect.callback<SearchResult, Error>((resume, signal) => {
-        let open = true
-        const done = (effect: Effect.Effect<SearchResult, Error>) => {
-          if (!open) return
-          open = false
-          resume(effect)
-        }
-        const onabort = () => done(Effect.fail(abort(input.signal)))
+          const filename = `ripgrep-${VERSION}-${config.platform}.${config.extension}`
+          const url = `https://github.com/BurntSushi/ripgrep/releases/download/${VERSION}/${filename}`
+          const archive = path.join(Global.Path.bin, filename)
 
-        w.onerror = (evt) => {
-          done(Effect.fail(toError(evt.error ?? evt.message)))
-        }
-        w.onmessage = (evt: MessageEvent<WorkerResult | WorkerError>) => {
-          const msg = evt.data
-          if (msg.type === "error") {
-            done(Effect.fail(Object.assign(new Error(msg.error.message), msg.error)))
-            return
+          log.info("downloading ripgrep", { url })
+          yield* fs.ensureDir(Global.Path.bin).pipe(Effect.orDie)
+
+          const bytes = yield* HttpClientRequest.get(url).pipe(
+            http.execute,
+            Effect.flatMap((response) => response.arrayBuffer),
+            Effect.mapError((cause) => (cause instanceof Error ? cause : new Error(String(cause)))),
+          )
+          if (bytes.byteLength === 0) {
+            return yield* Effect.fail(new Error(`failed to download ripgrep from ${url}`))
           }
-          if (msg.code === 1) {
-            done(Effect.succeed({ items: [], partial: false }))
-            return
+
+          yield* fs.writeWithDirs(archive, new Uint8Array(bytes)).pipe(Effect.orDie)
+          const extracted = yield* extract(archive, config)
+          const exists = yield* fs.exists(extracted).pipe(Effect.orDie)
+          if (!exists) {
+            return yield* Effect.fail(new Error(`ripgrep archive did not contain executable: ${extracted}`))
           }
-          if (msg.code !== 0 && msg.code !== 1 && msg.code !== 2) {
-            done(Effect.fail(error(msg.stderr, msg.code)))
-            return
+
+          yield* fs.copyFile(extracted, target).pipe(Effect.orDie)
+          if (process.platform !== "win32") {
+            yield* fs.chmod(target, 0o755).pipe(Effect.orDie)
           }
-          done(
-            Effect.sync(() => ({
-              items: parse(msg.stdout),
-              partial: msg.code === 2,
-            })),
-          )
-        }
+          yield* fs.remove(archive, { force: true }).pipe(Effect.ignore)
+          return target
+        }),
+      )
 
-        input.signal?.addEventListener("abort", onabort, { once: true })
-        signal.addEventListener("abort", onabort, { once: true })
-        w.postMessage({
-          kind: "search",
-          cwd: input.cwd,
-          args: searchArgs(input),
-        } satisfies Run)
-
-        return Effect.sync(() => {
-          input.signal?.removeEventListener("abort", onabort)
-          signal.removeEventListener("abort", onabort)
-          w.onerror = null
-          w.onmessage = null
+      const check = Effect.fnUntraced(function* (cwd: string) {
+        if (yield* fs.isDir(cwd).pipe(Effect.orDie)) return
+        return yield* Effect.fail(
+          Object.assign(new Error(`No such file or directory: '${cwd}'`), {
+            code: "ENOENT",
+            errno: -2,
+            path: cwd,
+          }),
+        )
+      })
+
+      const command = Effect.fnUntraced(function* (cwd: string, args: string[]) {
+        const binary = yield* filepath
+        return ChildProcess.make(binary, args, {
+          cwd,
+          env: env(),
+          extendEnv: true,
+          stdin: "ignore",
         })
-      }),
-    (w) => Effect.sync(() => w.terminate()),
-  )
-}
+      })
+
+      const files: Interface["files"] = (input) =>
+        Stream.callback<string, PlatformError | Error>((queue) =>
+          Effect.gen(function* () {
+            yield* Effect.forkScoped(
+              Effect.gen(function* () {
+                yield* check(input.cwd)
+                const handle = yield* spawner.spawn(yield* command(input.cwd, filesArgs(input)))
+                const stderr = yield* Stream.mkString(Stream.decodeText(handle.stderr)).pipe(Effect.forkScoped)
+                const stdout = yield* Stream.decodeText(handle.stdout).pipe(
+                  Stream.splitLines,
+                  Stream.filter((line) => line.length > 0),
+                  Stream.runForEach((line) => Effect.sync(() => Queue.offerUnsafe(queue, clean(line)))),
+                  Effect.forkScoped,
+                )
+                const code = yield* raceAbort(handle.exitCode, input.signal)
+                yield* Fiber.join(stdout)
+                if (code === 0 || code === 1) {
+                  Queue.endUnsafe(queue)
+                  return
+                }
+                fail(queue, error(yield* Fiber.join(stderr), code))
+              }).pipe(
+                Effect.catch((err) =>
+                  Effect.sync(() => {
+                    fail(queue, err)
+                  }),
+                ),
+              ),
+            )
+          }),
+        )
 
-function filesDirect(input: FilesInput) {
-  return Stream.callback<string, Error>(
-    Effect.fnUntraced(function* (queue: Queue.Queue<string, Error | Cause.Done>) {
-      let buf = ""
-      let err = ""
-
-      const out = {
-        write(chunk: unknown) {
-          buf = drain(buf, chunk, (line) => {
-            Queue.offerUnsafe(queue, clean(line))
-          })
-        },
-      }
-
-      const stderr = {
-        write(chunk: unknown) {
-          err += text(chunk)
-        },
-      }
-
-      yield* Effect.forkScoped(
-        Effect.gen(function* () {
-          yield* check(input.cwd)
-          const ret = yield* Effect.tryPromise({
-            try: () =>
-              ripgrep(filesArgs(input), {
-                stdout: out,
-                stderr,
-                ...opts(input.cwd),
-              }),
-            catch: toError,
-          })
-          if (buf) Queue.offerUnsafe(queue, clean(buf))
-          if (ret.code === 0 || ret.code === 1) {
-            Queue.endUnsafe(queue)
-            return
-          }
-          fail(queue, error(err, ret.code ?? 1))
-        }).pipe(
-          Effect.catch((err) =>
-            Effect.sync(() => {
-              fail(queue, err)
-            }),
-          ),
-        ),
-      )
-    }),
-  )
-}
+      const search: Interface["search"] = Effect.fn("Ripgrep.search")(function* (input: SearchInput) {
+        yield* check(input.cwd)
+
+        const program = Effect.scoped(
+          Effect.gen(function* () {
+            const handle = yield* spawner.spawn(yield* command(input.cwd, searchArgs(input)))
+
+            const [items, stderr, code] = yield* Effect.all(
+              [
+                Stream.decodeText(handle.stdout).pipe(
+                  Stream.splitLines,
+                  Stream.filter((line) => line.length > 0),
+                  Stream.mapEffect(parse),
+                  Stream.filter((item): item is Match => item.type === "match"),
+                  Stream.map((item) => row(item.data)),
+                  Stream.runCollect,
+                  Effect.map((chunk) => [...chunk]),
+                ),
+                Stream.mkString(Stream.decodeText(handle.stderr)),
+                handle.exitCode,
+              ],
+              { concurrency: "unbounded" },
+            )
+
+            if (code !== 0 && code !== 1 && code !== 2) {
+              return yield* Effect.fail(error(stderr, code))
+            }
+
+            return {
+              items: code === 1 ? [] : items,
+              partial: code === 2,
+            }
+          }),
+        )
+
+        return yield* raceAbort(program, input.signal)
+      })
 
-function filesWorker(input: FilesInput) {
-  return Stream.callback<string, Error>(
-    Effect.fnUntraced(function* (queue: Queue.Queue<string, Error | Cause.Done>) {
-      if (input.signal?.aborted) {
-        fail(queue, abort(input.signal))
-        return
-      }
-
-      const w = yield* Effect.acquireRelease(worker(), (w) => Effect.sync(() => w.terminate()))
-      let open = true
-      const close = () => {
-        if (!open) return false
-        open = false
-        return true
-      }
-      const onabort = () => {
-        if (!close()) return
-        fail(queue, abort(input.signal))
-      }
-
-      w.onerror = (evt) => {
-        if (!close()) return
-        fail(queue, toError(evt.error ?? evt.message))
-      }
-      w.onmessage = (evt: MessageEvent<WorkerLine | WorkerDone | WorkerError>) => {
-        const msg = evt.data
-        if (msg.type === "line") {
-          if (open) Queue.offerUnsafe(queue, msg.line)
-          return
+      const tree: Interface["tree"] = Effect.fn("Ripgrep.tree")(function* (input: TreeInput) {
+        log.info("tree", input)
+        const list = Array.from(yield* files({ cwd: input.cwd, signal: input.signal }).pipe(Stream.runCollect))
+
+        interface Node {
+          name: string
+          children: Map<string, Node>
         }
-        if (!close()) return
-        if (msg.type === "error") {
-          fail(queue, Object.assign(new Error(msg.error.message), msg.error))
-          return
+
+        function child(node: Node, name: string) {
+          const item = node.children.get(name)
+          if (item) return item
+          const next = { name, children: new Map() }
+          node.children.set(name, next)
+          return next
         }
-        if (msg.code === 0 || msg.code === 1) {
-          Queue.endUnsafe(queue)
-          return
+
+        function count(node: Node): number {
+          return Array.from(node.children.values()).reduce((sum, child) => sum + 1 + count(child), 0)
         }
-        fail(queue, error(msg.stderr, msg.code))
-      }
-
-      yield* Effect.acquireRelease(
-        Effect.sync(() => {
-          input.signal?.addEventListener("abort", onabort, { once: true })
-          w.postMessage({
-            kind: "files",
-            cwd: input.cwd,
-            args: filesArgs(input),
-          } satisfies Run)
-        }),
-        () =>
-          Effect.sync(() => {
-            input.signal?.removeEventListener("abort", onabort)
-            w.onerror = null
-            w.onmessage = null
-          }),
-      )
-    }),
-  )
-}
 
-export const layer = Layer.effect(
-  Service,
-  Effect.gen(function* () {
-    const source = (input: FilesInput) => {
-      const useWorker = !!input.signal && typeof Worker !== "undefined"
-      if (!useWorker && input.signal) {
-        log.warn("worker unavailable, ripgrep abort disabled")
-      }
-      return useWorker ? filesWorker(input) : filesDirect(input)
-    }
-
-    const files: Interface["files"] = (input) => source(input)
-
-    const tree: Interface["tree"] = Effect.fn("Ripgrep.tree")(function* (input: TreeInput) {
-      log.info("tree", input)
-      const list = Array.from(yield* source({ cwd: input.cwd, signal: input.signal }).pipe(Stream.runCollect))
-
-      interface Node {
-        name: string
-        children: Map<string, Node>
-      }
-
-      function child(node: Node, name: string) {
-        const item = node.children.get(name)
-        if (item) return item
-        const next = { name, children: new Map() }
-        node.children.set(name, next)
-        return next
-      }
-
-      function count(node: Node): number {
-        return Array.from(node.children.values()).reduce((sum, child) => sum + 1 + count(child), 0)
-      }
-
-      const root: Node = { name: "", children: new Map() }
-      for (const file of list) {
-        if (file.includes(".opencode")) continue
-        const parts = file.split(path.sep)
-        if (parts.length < 2) continue
-        let node = root
-        for (const part of parts.slice(0, -1)) {
-          node = child(node, part)
+        const root: Node = { name: "", children: new Map() }
+        for (const file of list) {
+          if (file.includes(".opencode")) continue
+          const parts = file.split(path.sep)
+          if (parts.length < 2) continue
+          let node = root
+          for (const part of parts.slice(0, -1)) {
+            node = child(node, part)
+          }
+        }
+
+        const total = count(root)
+        const limit = input.limit ?? total
+        const lines: string[] = []
+        const queue: Array<{ node: Node; path: string }> = Array.from(root.children.values())
+          .sort((a, b) => a.name.localeCompare(b.name))
+          .map((node) => ({ node, path: node.name }))
+
+        let used = 0
+        for (let i = 0; i < queue.length && used < limit; i++) {
+          const item = queue[i]
+          lines.push(item.path)
+          used++
+          queue.push(
+            ...Array.from(item.node.children.values())
+              .sort((a, b) => a.name.localeCompare(b.name))
+              .map((node) => ({ node, path: `${item.path}/${node.name}` })),
+          )
         }
-      }
-
-      const total = count(root)
-      const limit = input.limit ?? total
-      const lines: string[] = []
-      const queue: Array<{ node: Node; path: string }> = Array.from(root.children.values())
-        .sort((a, b) => a.name.localeCompare(b.name))
-        .map((node) => ({ node, path: node.name }))
-
-      let used = 0
-      for (let i = 0; i < queue.length && used < limit; i++) {
-        const item = queue[i]
-        lines.push(item.path)
-        used++
-        queue.push(
-          ...Array.from(item.node.children.values())
-            .sort((a, b) => a.name.localeCompare(b.name))
-            .map((node) => ({ node, path: `${item.path}/${node.name}` })),
-        )
-      }
 
-      if (total > used) lines.push(`[${total - used} truncated]`)
-      return lines.join("\n")
-    })
+        if (total > used) lines.push(`[${total - used} truncated]`)
+        return lines.join("\n")
+      })
 
-    const search: Interface["search"] = Effect.fn("Ripgrep.search")(function* (input: SearchInput) {
-      const useWorker = !!input.signal && typeof Worker !== "undefined"
-      if (!useWorker && input.signal) {
-        log.warn("worker unavailable, ripgrep abort disabled")
-      }
-      return yield* useWorker ? searchWorker(input) : searchDirect(input)
-    })
+      return Service.of({ files, tree, search })
+    }),
+  )
 
-    return Service.of({ files, tree, search })
-  }),
+export const defaultLayer = layer.pipe(
+  Layer.provide(FetchHttpClient.layer),
+  Layer.provide(AppFileSystem.defaultLayer),
+  Layer.provide(CrossSpawnSpawner.defaultLayer),
 )
 
-export const defaultLayer = layer
-
 export * as Ripgrep from "./ripgrep"

+ 0 - 102
packages/opencode/src/file/ripgrep.worker.ts

@@ -1,102 +0,0 @@
-import { ripgrep } from "ripgrep"
-import { sanitizedProcessEnv } from "@/util/opencode-process"
-
-function env() {
-  const env = sanitizedProcessEnv()
-  delete env.RIPGREP_CONFIG_PATH
-  return env
-}
-
-function opts(cwd: string) {
-  return {
-    env: env(),
-    preopens: { ".": cwd },
-  }
-}
-
-type Run = {
-  kind: "files" | "search"
-  cwd: string
-  args: string[]
-}
-
-function text(input: unknown) {
-  if (typeof input === "string") return input
-  if (input instanceof ArrayBuffer) return Buffer.from(input).toString()
-  if (ArrayBuffer.isView(input)) return Buffer.from(input.buffer, input.byteOffset, input.byteLength).toString()
-  return String(input)
-}
-
-function error(input: unknown) {
-  if (input instanceof Error) {
-    return {
-      message: input.message,
-      name: input.name,
-      stack: input.stack,
-    }
-  }
-
-  return {
-    message: String(input),
-  }
-}
-
-function clean(file: string) {
-  return file.replace(/^\.[\\/]/, "")
-}
-
-onmessage = async (evt: MessageEvent<Run>) => {
-  const msg = evt.data
-
-  try {
-    if (msg.kind === "search") {
-      const ret = await ripgrep(msg.args, {
-        buffer: true,
-        ...opts(msg.cwd),
-      })
-      postMessage({
-        type: "result",
-        code: ret.code ?? 0,
-        stdout: ret.stdout ?? "",
-        stderr: ret.stderr ?? "",
-      })
-      return
-    }
-
-    let buf = ""
-    let err = ""
-    const out = {
-      write(chunk: unknown) {
-        buf += text(chunk)
-        const lines = buf.split(/\r?\n/)
-        buf = lines.pop() || ""
-        for (const line of lines) {
-          if (line) postMessage({ type: "line", line: clean(line) })
-        }
-      },
-    }
-    const stderr = {
-      write(chunk: unknown) {
-        err += text(chunk)
-      },
-    }
-
-    const ret = await ripgrep(msg.args, {
-      stdout: out,
-      stderr,
-      ...opts(msg.cwd),
-    })
-
-    if (buf) postMessage({ type: "line", line: clean(buf) })
-    postMessage({
-      type: "done",
-      code: ret.code ?? 0,
-      stderr: err,
-    })
-  } catch (err) {
-    postMessage({
-      type: "error",
-      error: error(err),
-    })
-  }
-}