Przeglądaj źródła

fix: restore cross-spawn behavior for effect child processes (#18798)

Kit Langton 3 tygodni temu
rodzic
commit
41c77ccb33

+ 476 - 0
packages/opencode/src/effect/cross-spawn-spawner.ts

@@ -0,0 +1,476 @@
+import type * as Arr from "effect/Array"
+import { NodeSink, NodeStream } from "@effect/platform-node"
+import * as Deferred from "effect/Deferred"
+import * as Effect from "effect/Effect"
+import * as Exit from "effect/Exit"
+import * as FileSystem from "effect/FileSystem"
+import * as Layer from "effect/Layer"
+import * as Path from "effect/Path"
+import * as PlatformError from "effect/PlatformError"
+import * as Predicate from "effect/Predicate"
+import type * as Scope from "effect/Scope"
+import * as Sink from "effect/Sink"
+import * as Stream from "effect/Stream"
+import * as ChildProcess from "effect/unstable/process/ChildProcess"
+import type { ChildProcessHandle } from "effect/unstable/process/ChildProcessSpawner"
+import {
+  ChildProcessSpawner,
+  ExitCode,
+  make as makeSpawner,
+  makeHandle,
+  ProcessId,
+} from "effect/unstable/process/ChildProcessSpawner"
+import * as NodeChildProcess from "node:child_process"
+import { PassThrough } from "node:stream"
+import launch from "cross-spawn"
+
+const toError = (err: unknown): Error => (err instanceof globalThis.Error ? err : new globalThis.Error(String(err)))
+
+const toTag = (err: NodeJS.ErrnoException): PlatformError.SystemErrorTag => {
+  switch (err.code) {
+    case "ENOENT":
+      return "NotFound"
+    case "EACCES":
+      return "PermissionDenied"
+    case "EEXIST":
+      return "AlreadyExists"
+    case "EISDIR":
+      return "BadResource"
+    case "ENOTDIR":
+      return "BadResource"
+    case "EBUSY":
+      return "Busy"
+    case "ELOOP":
+      return "BadResource"
+    default:
+      return "Unknown"
+  }
+}
+
+const flatten = (command: ChildProcess.Command) => {
+  const commands: Array<ChildProcess.StandardCommand> = []
+  const opts: Array<ChildProcess.PipeOptions> = []
+
+  const walk = (cmd: ChildProcess.Command): void => {
+    switch (cmd._tag) {
+      case "StandardCommand":
+        commands.push(cmd)
+        return
+      case "PipedCommand":
+        walk(cmd.left)
+        opts.push(cmd.options)
+        walk(cmd.right)
+        return
+    }
+  }
+
+  walk(command)
+  if (commands.length === 0) throw new Error("flatten produced empty commands array")
+  const [head, ...tail] = commands
+  return {
+    commands: [head, ...tail] as Arr.NonEmptyReadonlyArray<ChildProcess.StandardCommand>,
+    opts,
+  }
+}
+
+const toPlatformError = (
+  method: string,
+  err: NodeJS.ErrnoException,
+  command: ChildProcess.Command,
+): PlatformError.PlatformError => {
+  const cmd = flatten(command)
+    .commands.map((x) => `${x.command} ${x.args.join(" ")}`)
+    .join(" | ")
+  return PlatformError.systemError({
+    _tag: toTag(err),
+    module: "ChildProcess",
+    method,
+    pathOrDescriptor: cmd,
+    syscall: err.syscall,
+    cause: err,
+  })
+}
+
+type ExitSignal = Deferred.Deferred<readonly [code: number | null, signal: NodeJS.Signals | null]>
+
+export const make = Effect.gen(function* () {
+  const fs = yield* FileSystem.FileSystem
+  const path = yield* Path.Path
+
+  const cwd = Effect.fnUntraced(function* (opts: ChildProcess.CommandOptions) {
+    if (Predicate.isUndefined(opts.cwd)) return undefined
+    yield* fs.access(opts.cwd)
+    return path.resolve(opts.cwd)
+  })
+
+  const env = (opts: ChildProcess.CommandOptions) =>
+    opts.extendEnv ? { ...globalThis.process.env, ...opts.env } : opts.env
+
+  const input = (x: ChildProcess.CommandInput | undefined): NodeChildProcess.IOType | undefined =>
+    Stream.isStream(x) ? "pipe" : x
+
+  const output = (x: ChildProcess.CommandOutput | undefined): NodeChildProcess.IOType | undefined =>
+    Sink.isSink(x) ? "pipe" : x
+
+  const stdin = (opts: ChildProcess.CommandOptions): ChildProcess.StdinConfig => {
+    const cfg: ChildProcess.StdinConfig = { stream: "pipe", encoding: "utf-8", endOnDone: true }
+    if (Predicate.isUndefined(opts.stdin)) return cfg
+    if (typeof opts.stdin === "string") return { ...cfg, stream: opts.stdin }
+    if (Stream.isStream(opts.stdin)) return { ...cfg, stream: opts.stdin }
+    return {
+      stream: opts.stdin.stream,
+      encoding: opts.stdin.encoding ?? cfg.encoding,
+      endOnDone: opts.stdin.endOnDone ?? cfg.endOnDone,
+    }
+  }
+
+  const stdio = (opts: ChildProcess.CommandOptions, key: "stdout" | "stderr"): ChildProcess.StdoutConfig => {
+    const cfg = opts[key]
+    if (Predicate.isUndefined(cfg)) return { stream: "pipe" }
+    if (typeof cfg === "string") return { stream: cfg }
+    if (Sink.isSink(cfg)) return { stream: cfg }
+    return { stream: cfg.stream }
+  }
+
+  const fds = (opts: ChildProcess.CommandOptions) => {
+    if (Predicate.isUndefined(opts.additionalFds)) return []
+    return Object.entries(opts.additionalFds)
+      .flatMap(([name, config]) => {
+        const fd = ChildProcess.parseFdName(name)
+        return Predicate.isUndefined(fd) ? [] : [{ fd, config }]
+      })
+      .toSorted((a, b) => a.fd - b.fd)
+  }
+
+  const stdios = (
+    sin: ChildProcess.StdinConfig,
+    sout: ChildProcess.StdoutConfig,
+    serr: ChildProcess.StderrConfig,
+    extra: ReadonlyArray<{ fd: number; config: ChildProcess.AdditionalFdConfig }>,
+  ): NodeChildProcess.StdioOptions => {
+    const pipe = (x: NodeChildProcess.IOType | undefined) =>
+      process.platform === "win32" && x === "pipe" ? "overlapped" : x
+    const arr: Array<NodeChildProcess.IOType | undefined> = [
+      pipe(input(sin.stream)),
+      pipe(output(sout.stream)),
+      pipe(output(serr.stream)),
+    ]
+    if (extra.length === 0) return arr as NodeChildProcess.StdioOptions
+    const max = extra.reduce((acc, x) => Math.max(acc, x.fd), 2)
+    for (let i = 3; i <= max; i++) arr[i] = "ignore"
+    for (const x of extra) arr[x.fd] = pipe("pipe")
+    return arr as NodeChildProcess.StdioOptions
+  }
+
+  const setupFds = Effect.fnUntraced(function* (
+    command: ChildProcess.StandardCommand,
+    proc: NodeChildProcess.ChildProcess,
+    extra: ReadonlyArray<{ fd: number; config: ChildProcess.AdditionalFdConfig }>,
+  ) {
+    if (extra.length === 0) {
+      return {
+        getInputFd: () => Sink.drain,
+        getOutputFd: () => Stream.empty,
+      }
+    }
+
+    const ins = new Map<number, Sink.Sink<void, Uint8Array, never, PlatformError.PlatformError>>()
+    const outs = new Map<number, Stream.Stream<Uint8Array, PlatformError.PlatformError>>()
+
+    for (const x of extra) {
+      const node = proc.stdio[x.fd]
+      switch (x.config.type) {
+        case "input": {
+          let sink: Sink.Sink<void, Uint8Array, never, PlatformError.PlatformError> = Sink.drain
+          if (node && "write" in node) {
+            sink = NodeSink.fromWritable({
+              evaluate: () => node,
+              onError: (err) => toPlatformError(`fromWritable(fd${x.fd})`, toError(err), command),
+              endOnDone: true,
+            })
+          }
+          if (x.config.stream) yield* Effect.forkScoped(Stream.run(x.config.stream, sink))
+          ins.set(x.fd, sink)
+          break
+        }
+        case "output": {
+          let stream: Stream.Stream<Uint8Array, PlatformError.PlatformError> = Stream.empty
+          if (node && "read" in node) {
+            const tap = new PassThrough()
+            node.on("error", (err) => tap.destroy(toError(err)))
+            node.pipe(tap)
+            stream = NodeStream.fromReadable({
+              evaluate: () => tap,
+              onError: (err) => toPlatformError(`fromReadable(fd${x.fd})`, toError(err), command),
+            })
+          }
+          if (x.config.sink) stream = Stream.transduce(stream, x.config.sink)
+          outs.set(x.fd, stream)
+          break
+        }
+      }
+    }
+
+    return {
+      getInputFd: (fd: number) => ins.get(fd) ?? Sink.drain,
+      getOutputFd: (fd: number) => outs.get(fd) ?? Stream.empty,
+    }
+  })
+
+  const setupStdin = (
+    command: ChildProcess.StandardCommand,
+    proc: NodeChildProcess.ChildProcess,
+    cfg: ChildProcess.StdinConfig,
+  ) =>
+    Effect.suspend(() => {
+      let sink: Sink.Sink<void, unknown, never, PlatformError.PlatformError> = Sink.drain
+      if (Predicate.isNotNull(proc.stdin)) {
+        sink = NodeSink.fromWritable({
+          evaluate: () => proc.stdin!,
+          onError: (err) => toPlatformError("fromWritable(stdin)", toError(err), command),
+          endOnDone: cfg.endOnDone,
+          encoding: cfg.encoding,
+        })
+      }
+      if (Stream.isStream(cfg.stream)) return Effect.as(Effect.forkScoped(Stream.run(cfg.stream, sink)), sink)
+      return Effect.succeed(sink)
+    })
+
+  const setupOutput = (
+    command: ChildProcess.StandardCommand,
+    proc: NodeChildProcess.ChildProcess,
+    out: ChildProcess.StdoutConfig,
+    err: ChildProcess.StderrConfig,
+  ) => {
+    let stdout = proc.stdout
+      ? NodeStream.fromReadable({
+          evaluate: () => proc.stdout!,
+          onError: (cause) => toPlatformError("fromReadable(stdout)", toError(cause), command),
+        })
+      : Stream.empty
+    let stderr = proc.stderr
+      ? NodeStream.fromReadable({
+          evaluate: () => proc.stderr!,
+          onError: (cause) => toPlatformError("fromReadable(stderr)", toError(cause), command),
+        })
+      : Stream.empty
+
+    if (Sink.isSink(out.stream)) stdout = Stream.transduce(stdout, out.stream)
+    if (Sink.isSink(err.stream)) stderr = Stream.transduce(stderr, err.stream)
+
+    return { stdout, stderr, all: Stream.merge(stdout, stderr) }
+  }
+
+  const spawn = (command: ChildProcess.StandardCommand, opts: NodeChildProcess.SpawnOptions) =>
+    Effect.callback<readonly [NodeChildProcess.ChildProcess, ExitSignal], PlatformError.PlatformError>((resume) => {
+      const signal = Deferred.makeUnsafe<readonly [code: number | null, signal: NodeJS.Signals | null]>()
+      const proc = launch(command.command, command.args, opts)
+      let end = false
+      let exit: readonly [code: number | null, signal: NodeJS.Signals | null] | undefined
+      proc.on("error", (err) => {
+        resume(Effect.fail(toPlatformError("spawn", err, command)))
+      })
+      proc.on("exit", (...args) => {
+        exit = args
+      })
+      proc.on("close", (...args) => {
+        if (end) return
+        end = true
+        Deferred.doneUnsafe(signal, Exit.succeed(exit ?? args))
+      })
+      proc.on("spawn", () => {
+        resume(Effect.succeed([proc, signal]))
+      })
+      return Effect.sync(() => {
+        proc.kill("SIGTERM")
+      })
+    })
+
+  const killGroup = (
+    command: ChildProcess.StandardCommand,
+    proc: NodeChildProcess.ChildProcess,
+    signal: NodeJS.Signals,
+  ) => {
+    if (globalThis.process.platform === "win32") {
+      return Effect.callback<void, PlatformError.PlatformError>((resume) => {
+        NodeChildProcess.exec(`taskkill /pid ${proc.pid} /T /F`, { windowsHide: true }, (err) => {
+          if (err) return resume(Effect.fail(toPlatformError("kill", toError(err), command)))
+          resume(Effect.void)
+        })
+      })
+    }
+
+    return Effect.try({
+      try: () => {
+        globalThis.process.kill(-proc.pid!, signal)
+      },
+      catch: (err) => toPlatformError("kill", toError(err), command),
+    })
+  }
+
+  const killOne = (
+    command: ChildProcess.StandardCommand,
+    proc: NodeChildProcess.ChildProcess,
+    signal: NodeJS.Signals,
+  ) =>
+    Effect.suspend(() => {
+      if (proc.kill(signal)) return Effect.void
+      return Effect.fail(toPlatformError("kill", new Error("Failed to kill child process"), command))
+    })
+
+  const timeout =
+    (
+      proc: NodeChildProcess.ChildProcess,
+      command: ChildProcess.StandardCommand,
+      opts: ChildProcess.KillOptions | undefined,
+    ) =>
+    <A, E, R>(
+      f: (
+        command: ChildProcess.StandardCommand,
+        proc: NodeChildProcess.ChildProcess,
+        signal: NodeJS.Signals,
+      ) => Effect.Effect<A, E, R>,
+    ) => {
+      const signal = opts?.killSignal ?? "SIGTERM"
+      if (Predicate.isUndefined(opts?.forceKillAfter)) return f(command, proc, signal)
+      return Effect.timeoutOrElse(f(command, proc, signal), {
+        duration: opts.forceKillAfter,
+        onTimeout: () => f(command, proc, "SIGKILL"),
+      })
+    }
+
+  const source = (handle: ChildProcessHandle, from: ChildProcess.PipeFromOption | undefined) => {
+    const opt = from ?? "stdout"
+    switch (opt) {
+      case "stdout":
+        return handle.stdout
+      case "stderr":
+        return handle.stderr
+      case "all":
+        return handle.all
+      default: {
+        const fd = ChildProcess.parseFdName(opt)
+        return Predicate.isNotUndefined(fd) ? handle.getOutputFd(fd) : handle.stdout
+      }
+    }
+  }
+
+  const spawnCommand: (
+    command: ChildProcess.Command,
+  ) => Effect.Effect<ChildProcessHandle, PlatformError.PlatformError, Scope.Scope> = Effect.fnUntraced(
+    function* (command) {
+      switch (command._tag) {
+        case "StandardCommand": {
+          const sin = stdin(command.options)
+          const sout = stdio(command.options, "stdout")
+          const serr = stdio(command.options, "stderr")
+          const extra = fds(command.options)
+          const dir = yield* cwd(command.options)
+
+          const [proc, signal] = yield* Effect.acquireRelease(
+            spawn(command, {
+              cwd: dir,
+              env: env(command.options),
+              stdio: stdios(sin, sout, serr, extra),
+              detached: command.options.detached ?? process.platform !== "win32",
+              shell: command.options.shell,
+              windowsHide: process.platform === "win32",
+            }),
+            Effect.fnUntraced(function* ([proc, signal]) {
+              const done = yield* Deferred.isDone(signal)
+              const kill = timeout(proc, command, command.options)
+              if (done) {
+                const [code] = yield* Deferred.await(signal)
+                if (process.platform === "win32") return yield* Effect.void
+                if (code !== 0 && Predicate.isNotNull(code)) return yield* Effect.ignore(kill(killGroup))
+                return yield* Effect.void
+              }
+              return yield* kill((command, proc, signal) =>
+                Effect.catch(killGroup(command, proc, signal), () => killOne(command, proc, signal)),
+              ).pipe(Effect.andThen(Deferred.await(signal)), Effect.ignore)
+            }),
+          )
+
+          const fd = yield* setupFds(command, proc, extra)
+          const out = setupOutput(command, proc, sout, serr)
+          return makeHandle({
+            pid: ProcessId(proc.pid!),
+            stdin: yield* setupStdin(command, proc, sin),
+            stdout: out.stdout,
+            stderr: out.stderr,
+            all: out.all,
+            getInputFd: fd.getInputFd,
+            getOutputFd: fd.getOutputFd,
+            isRunning: Effect.map(Deferred.isDone(signal), (done) => !done),
+            exitCode: Effect.flatMap(Deferred.await(signal), ([code, signal]) => {
+              if (Predicate.isNotNull(code)) return Effect.succeed(ExitCode(code))
+              return Effect.fail(
+                toPlatformError(
+                  "exitCode",
+                  new Error(`Process interrupted due to receipt of signal: '${signal}'`),
+                  command,
+                ),
+              )
+            }),
+            kill: (opts?: ChildProcess.KillOptions) =>
+              timeout(
+                proc,
+                command,
+                opts,
+              )((command, proc, signal) =>
+                Effect.catch(killGroup(command, proc, signal), () => killOne(command, proc, signal)),
+              ).pipe(Effect.andThen(Deferred.await(signal)), Effect.asVoid),
+          })
+        }
+        case "PipedCommand": {
+          const flat = flatten(command)
+          const [head, ...tail] = flat.commands
+          let handle = spawnCommand(head)
+          for (let i = 0; i < tail.length; i++) {
+            const next = tail[i]
+            const opts = flat.opts[i] ?? {}
+            const sin = stdin(next.options)
+            const stream = Stream.unwrap(Effect.map(handle, (x) => source(x, opts.from)))
+            const to = opts.to ?? "stdin"
+            if (to === "stdin") {
+              handle = spawnCommand(
+                ChildProcess.make(next.command, next.args, {
+                  ...next.options,
+                  stdin: { ...sin, stream },
+                }),
+              )
+              continue
+            }
+            const fd = ChildProcess.parseFdName(to)
+            if (Predicate.isUndefined(fd)) {
+              handle = spawnCommand(
+                ChildProcess.make(next.command, next.args, {
+                  ...next.options,
+                  stdin: { ...sin, stream },
+                }),
+              )
+              continue
+            }
+            handle = spawnCommand(
+              ChildProcess.make(next.command, next.args, {
+                ...next.options,
+                additionalFds: {
+                  ...next.options.additionalFds,
+                  [ChildProcess.fdName(fd) as `fd${number}`]: { type: "input", stream },
+                },
+              }),
+            )
+          }
+          return yield* handle
+        }
+      }
+    },
+  )
+
+  return makeSpawner(spawnCommand)
+})
+
+export const layer: Layer.Layer<ChildProcessSpawner, never, FileSystem.FileSystem | Path.Path> = Layer.effect(
+  ChildProcessSpawner,
+  make,
+)

+ 3 - 2
packages/opencode/src/installation/index.ts

@@ -1,6 +1,7 @@
-import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
+import { NodeFileSystem, NodePath } from "@effect/platform-node"
 import { Effect, Layer, Schema, ServiceMap, Stream } from "effect"
 import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
+import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
 import { makeRunPromise } from "@/effect/run-service"
 import { withTransientReadRetry } from "@/util/effect-http-client"
 import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
@@ -340,7 +341,7 @@ export namespace Installation {
 
   export const defaultLayer = layer.pipe(
     Layer.provide(FetchHttpClient.layer),
-    Layer.provide(NodeChildProcessSpawner.layer),
+    Layer.provide(CrossSpawnSpawner.layer),
     Layer.provide(NodeFileSystem.layer),
     Layer.provide(NodePath.layer),
   )

+ 4 - 3
packages/opencode/src/snapshot/index.ts

@@ -1,8 +1,9 @@
-import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
+import { NodeFileSystem, NodePath } from "@effect/platform-node"
 import { Cause, Duration, Effect, Layer, Schedule, ServiceMap, Stream } from "effect"
 import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
 import path from "path"
 import z from "zod"
+import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
 import { InstanceState } from "@/effect/instance-state"
 import { makeRunPromise } from "@/effect/run-service"
 import { AppFileSystem } from "@/filesystem"
@@ -354,9 +355,9 @@ export namespace Snapshot {
     )
 
   export const defaultLayer = layer.pipe(
-    Layer.provide(NodeChildProcessSpawner.layer),
+    Layer.provide(CrossSpawnSpawner.layer),
     Layer.provide(AppFileSystem.defaultLayer),
-    Layer.provide(NodeFileSystem.layer), // needed by NodeChildProcessSpawner
+    Layer.provide(NodeFileSystem.layer), // needed by CrossSpawnSpawner
     Layer.provide(NodePath.layer),
   )
 

+ 518 - 0
packages/opencode/test/effect/cross-spawn-spawner.test.ts

@@ -0,0 +1,518 @@
+import { NodeFileSystem, NodePath } from "@effect/platform-node"
+import { describe, expect } from "bun:test"
+import fs from "node:fs/promises"
+import path from "node:path"
+import { Effect, Exit, Layer, Stream } from "effect"
+import type * as PlatformError from "effect/PlatformError"
+import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
+import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
+import { tmpdir } from "../fixture/fixture"
+import { testEffect } from "../lib/effect"
+
+const live = CrossSpawnSpawner.layer.pipe(Layer.provide(NodeFileSystem.layer), Layer.provide(NodePath.layer))
+const fx = testEffect(live)
+
+function js(code: string, opts?: ChildProcess.CommandOptions) {
+  return ChildProcess.make("node", ["-e", code], opts)
+}
+
+function decodeByteStream(stream: Stream.Stream<Uint8Array, PlatformError.PlatformError>) {
+  return Stream.runCollect(stream).pipe(
+    Effect.map((chunks) => {
+      const total = chunks.reduce((acc, x) => acc + x.length, 0)
+      const out = new Uint8Array(total)
+      let off = 0
+      for (const chunk of chunks) {
+        out.set(chunk, off)
+        off += chunk.length
+      }
+      return new TextDecoder("utf-8").decode(out).trim()
+    }),
+  )
+}
+
+function alive(pid: number) {
+  try {
+    process.kill(pid, 0)
+    return true
+  } catch {
+    return false
+  }
+}
+
+async function gone(pid: number, timeout = 5_000) {
+  const end = Date.now() + timeout
+  while (Date.now() < end) {
+    if (!alive(pid)) return true
+    await Bun.sleep(50)
+  }
+  return !alive(pid)
+}
+
+describe("cross-spawn spawner", () => {
+  describe("basic spawning", () => {
+    fx.effect(
+      "captures stdout",
+      Effect.gen(function* () {
+        const out = yield* ChildProcessSpawner.ChildProcessSpawner.use((svc) =>
+          svc.string(ChildProcess.make(process.execPath, ["-e", 'process.stdout.write("ok")'])),
+        )
+        expect(out).toBe("ok")
+      }),
+    )
+
+    fx.effect(
+      "captures multiple lines",
+      Effect.gen(function* () {
+        const handle = yield* js('console.log("line1"); console.log("line2"); console.log("line3")')
+        const out = yield* decodeByteStream(handle.stdout)
+        expect(out).toBe("line1\nline2\nline3")
+      }),
+    )
+
+    fx.effect(
+      "returns exit code",
+      Effect.gen(function* () {
+        const handle = yield* js("process.exit(0)")
+        const code = yield* handle.exitCode
+        expect(code).toBe(ChildProcessSpawner.ExitCode(0))
+      }),
+    )
+
+    fx.effect(
+      "returns non-zero exit code",
+      Effect.gen(function* () {
+        const handle = yield* js("process.exit(42)")
+        const code = yield* handle.exitCode
+        expect(code).toBe(ChildProcessSpawner.ExitCode(42))
+      }),
+    )
+  })
+
+  describe("cwd option", () => {
+    fx.effect(
+      "uses cwd when spawning commands",
+      Effect.gen(function* () {
+        const tmp = yield* Effect.acquireRelease(
+          Effect.promise(() => tmpdir()),
+          (tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()),
+        )
+        const out = yield* ChildProcessSpawner.ChildProcessSpawner.use((svc) =>
+          svc.string(
+            ChildProcess.make(process.execPath, ["-e", "process.stdout.write(process.cwd())"], { cwd: tmp.path }),
+          ),
+        )
+        expect(out).toBe(tmp.path)
+      }),
+    )
+
+    fx.effect(
+      "fails for invalid cwd",
+      Effect.gen(function* () {
+        const exit = yield* Effect.exit(
+          ChildProcess.make("echo", ["test"], { cwd: "/nonexistent/directory/path" }).asEffect(),
+        )
+        expect(Exit.isFailure(exit)).toBe(true)
+      }),
+    )
+  })
+
+  describe("env option", () => {
+    fx.effect(
+      "passes environment variables with extendEnv",
+      Effect.gen(function* () {
+        const handle = yield* js('process.stdout.write(process.env.TEST_VAR ?? "")', {
+          env: { TEST_VAR: "test_value" },
+          extendEnv: true,
+        })
+        const out = yield* decodeByteStream(handle.stdout)
+        expect(out).toBe("test_value")
+      }),
+    )
+
+    fx.effect(
+      "passes multiple environment variables",
+      Effect.gen(function* () {
+        const handle = yield* js(
+          "process.stdout.write(`${process.env.VAR1}-${process.env.VAR2}-${process.env.VAR3}`)",
+          {
+            env: { VAR1: "one", VAR2: "two", VAR3: "three" },
+            extendEnv: true,
+          },
+        )
+        const out = yield* decodeByteStream(handle.stdout)
+        expect(out).toBe("one-two-three")
+      }),
+    )
+  })
+
+  describe("stderr", () => {
+    fx.effect(
+      "captures stderr output",
+      Effect.gen(function* () {
+        const handle = yield* js('process.stderr.write("error message")')
+        const err = yield* decodeByteStream(handle.stderr)
+        expect(err).toBe("error message")
+      }),
+    )
+
+    fx.effect(
+      "captures both stdout and stderr",
+      Effect.gen(function* () {
+        const handle = yield* js('process.stdout.write("stdout\\n"); process.stderr.write("stderr\\n")')
+        const [stdout, stderr] = yield* Effect.all([decodeByteStream(handle.stdout), decodeByteStream(handle.stderr)])
+        expect(stdout).toBe("stdout")
+        expect(stderr).toBe("stderr")
+      }),
+    )
+  })
+
+  describe("combined output (all)", () => {
+    fx.effect(
+      "captures stdout via .all when no stderr",
+      Effect.gen(function* () {
+        const handle = yield* ChildProcess.make("echo", ["hello from stdout"])
+        const all = yield* decodeByteStream(handle.all)
+        expect(all).toBe("hello from stdout")
+      }),
+    )
+
+    fx.effect(
+      "captures stderr via .all when no stdout",
+      Effect.gen(function* () {
+        const handle = yield* js('process.stderr.write("hello from stderr")')
+        const all = yield* decodeByteStream(handle.all)
+        expect(all).toBe("hello from stderr")
+      }),
+    )
+  })
+
+  describe("stdin", () => {
+    fx.effect(
+      "allows providing standard input to a command",
+      Effect.gen(function* () {
+        const input = "a b c"
+        const stdin = Stream.make(Buffer.from(input, "utf-8"))
+        const handle = yield* js(
+          'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out))',
+          { stdin },
+        )
+        const out = yield* decodeByteStream(handle.stdout)
+        yield* handle.exitCode
+        expect(out).toBe("a b c")
+      }),
+    )
+  })
+
+  describe("process control", () => {
+    fx.effect(
+      "kills a running process",
+      Effect.gen(function* () {
+        const exit = yield* Effect.exit(
+          Effect.gen(function* () {
+            const handle = yield* js("setTimeout(() => {}, 10_000)")
+            yield* handle.kill()
+            return yield* handle.exitCode
+          }),
+        )
+        expect(Exit.isFailure(exit) ? true : exit.value !== ChildProcessSpawner.ExitCode(0)).toBe(true)
+      }),
+    )
+
+    fx.effect(
+      "kills a child when scope exits",
+      Effect.gen(function* () {
+        const pid = yield* Effect.scoped(
+          Effect.gen(function* () {
+            const handle = yield* js("setInterval(() => {}, 10_000)")
+            return Number(handle.pid)
+          }),
+        )
+        const done = yield* Effect.promise(() => gone(pid))
+        expect(done).toBe(true)
+      }),
+    )
+
+    fx.effect(
+      "forceKillAfter escalates for stubborn processes",
+      Effect.gen(function* () {
+        if (process.platform === "win32") return
+
+        const started = Date.now()
+        const exit = yield* Effect.exit(
+          Effect.gen(function* () {
+            const handle = yield* js('process.on("SIGTERM", () => {}); setInterval(() => {}, 10_000)')
+            yield* handle.kill({ forceKillAfter: 100 })
+            return yield* handle.exitCode
+          }),
+        )
+
+        expect(Date.now() - started).toBeLessThan(1_000)
+        expect(Exit.isFailure(exit) ? true : exit.value !== ChildProcessSpawner.ExitCode(0)).toBe(true)
+      }),
+    )
+
+    fx.effect(
+      "isRunning reflects process state",
+      Effect.gen(function* () {
+        const handle = yield* js('process.stdout.write("done")')
+        yield* handle.exitCode
+        const running = yield* handle.isRunning
+        expect(running).toBe(false)
+      }),
+    )
+  })
+
+  describe("error handling", () => {
+    fx.effect(
+      "fails for invalid command",
+      Effect.gen(function* () {
+        const exit = yield* Effect.exit(
+          Effect.gen(function* () {
+            const handle = yield* ChildProcess.make("nonexistent-command-12345")
+            return yield* handle.exitCode
+          }),
+        )
+        expect(Exit.isFailure(exit) ? true : exit.value !== ChildProcessSpawner.ExitCode(0)).toBe(true)
+      }),
+    )
+  })
+
+  describe("pipeline", () => {
+    fx.effect(
+      "pipes stdout of one command to stdin of another",
+      Effect.gen(function* () {
+        const handle = yield* js('process.stdout.write("hello world")').pipe(
+          ChildProcess.pipeTo(
+            js(
+              'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out.toUpperCase()))',
+            ),
+          ),
+        )
+        const out = yield* decodeByteStream(handle.stdout)
+        yield* handle.exitCode
+        expect(out).toBe("HELLO WORLD")
+      }),
+    )
+
+    fx.effect(
+      "three-stage pipeline",
+      Effect.gen(function* () {
+        const handle = yield* js('process.stdout.write("hello world")').pipe(
+          ChildProcess.pipeTo(
+            js(
+              'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out.toUpperCase()))',
+            ),
+          ),
+          ChildProcess.pipeTo(
+            js(
+              'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out.replaceAll(" ", "-")))',
+            ),
+          ),
+        )
+        const out = yield* decodeByteStream(handle.stdout)
+        yield* handle.exitCode
+        expect(out).toBe("HELLO-WORLD")
+      }),
+    )
+
+    fx.effect(
+      "pipes stderr with { from: 'stderr' }",
+      Effect.gen(function* () {
+        const handle = yield* js('process.stderr.write("error")').pipe(
+          ChildProcess.pipeTo(
+            js(
+              'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out))',
+            ),
+            { from: "stderr" },
+          ),
+        )
+        const out = yield* decodeByteStream(handle.stdout)
+        yield* handle.exitCode
+        expect(out).toBe("error")
+      }),
+    )
+
+    fx.effect(
+      "pipes combined output with { from: 'all' }",
+      Effect.gen(function* () {
+        const handle = yield* js('process.stdout.write("stdout\\n"); process.stderr.write("stderr\\n")').pipe(
+          ChildProcess.pipeTo(
+            js(
+              'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out))',
+            ),
+            { from: "all" },
+          ),
+        )
+        const out = yield* decodeByteStream(handle.stdout)
+        yield* handle.exitCode
+        expect(out).toContain("stdout")
+        expect(out).toContain("stderr")
+      }),
+    )
+
+    fx.effect(
+      "pipes output fd3 with { from: 'fd3' }",
+      Effect.gen(function* () {
+        const handle = yield* js('require("node:fs").writeSync(3, "hello from fd3\\n")', {
+          additionalFds: { fd3: { type: "output" } },
+        }).pipe(
+          ChildProcess.pipeTo(
+            js(
+              'process.stdin.setEncoding("utf8"); let out = ""; process.stdin.on("data", (chunk) => out += chunk); process.stdin.on("end", () => process.stdout.write(out))',
+            ),
+            { from: "fd3" },
+          ),
+        )
+        const out = yield* decodeByteStream(handle.stdout)
+        yield* handle.exitCode
+        expect(out).toBe("hello from fd3")
+      }),
+    )
+
+    fx.effect(
+      "pipes stdout to fd3",
+      Effect.gen(function* () {
+        if (process.platform === "win32") return
+
+        const handle = yield* js('process.stdout.write("hello from stdout")').pipe(
+          ChildProcess.pipeTo(js('process.stdout.write(require("node:fs").readFileSync(3, "utf8"))'), { to: "fd3" }),
+        )
+        const out = yield* decodeByteStream(handle.stdout)
+        yield* handle.exitCode
+        expect(out).toBe("hello from stdout")
+      }),
+    )
+  })
+
+  describe("additional fds", () => {
+    fx.effect(
+      "reads data from output fd3",
+      Effect.gen(function* () {
+        const handle = yield* js('require("node:fs").writeSync(3, "hello from fd3\\n")', {
+          additionalFds: { fd3: { type: "output" } },
+        })
+        const out = yield* decodeByteStream(handle.getOutputFd(3))
+        yield* handle.exitCode
+        expect(out).toBe("hello from fd3")
+      }),
+    )
+
+    fx.effect(
+      "writes data to input fd3",
+      Effect.gen(function* () {
+        if (process.platform === "win32") return
+
+        const input = Stream.make(new TextEncoder().encode("data from parent"))
+        const handle = yield* js('process.stdout.write(require("node:fs").readFileSync(3, "utf8"))', {
+          additionalFds: { fd3: { type: "input", stream: input } },
+        })
+        const out = yield* decodeByteStream(handle.stdout)
+        yield* handle.exitCode
+        expect(out).toBe("data from parent")
+      }),
+    )
+
+    fx.effect(
+      "returns empty stream for unconfigured fd",
+      Effect.gen(function* () {
+        const handle =
+          process.platform === "win32"
+            ? yield* js('process.stdout.write("test")')
+            : yield* ChildProcess.make("echo", ["test"])
+        const out = yield* decodeByteStream(handle.getOutputFd(3))
+        yield* handle.exitCode
+        expect(out).toBe("")
+      }),
+    )
+
+    fx.effect(
+      "works alongside normal stdout and stderr",
+      Effect.gen(function* () {
+        const handle = yield* js(
+          'require("node:fs").writeSync(3, "fd3\\n"); process.stdout.write("stdout\\n"); process.stderr.write("stderr\\n")',
+          {
+            additionalFds: { fd3: { type: "output" } },
+          },
+        )
+        const stdout = yield* decodeByteStream(handle.stdout)
+        const stderr = yield* decodeByteStream(handle.stderr)
+        const fd3 = yield* decodeByteStream(handle.getOutputFd(3))
+        yield* handle.exitCode
+        expect(stdout).toBe("stdout")
+        expect(stderr).toBe("stderr")
+        expect(fd3).toBe("fd3")
+      }),
+    )
+  })
+
+  describe("large output", () => {
+    fx.effect(
+      "does not deadlock on large stdout",
+      Effect.gen(function* () {
+        const handle = yield* js("for (let i = 1; i <= 100000; i++) process.stdout.write(`${i}\\n`)")
+        const out = yield* handle.stdout.pipe(
+          Stream.decodeText(),
+          Stream.runFold(
+            () => "",
+            (acc, chunk) => acc + chunk,
+          ),
+        )
+        yield* handle.exitCode
+        const lines = out.trim().split("\n")
+        expect(lines.length).toBe(100000)
+        expect(lines[0]).toBe("1")
+        expect(lines[99999]).toBe("100000")
+      }),
+      { timeout: 10_000 },
+    )
+  })
+
+  describe("Windows-specific", () => {
+    fx.effect(
+      "uses shell routing on Windows",
+      Effect.gen(function* () {
+        if (process.platform !== "win32") return
+
+        const out = yield* ChildProcessSpawner.ChildProcessSpawner.use((svc) =>
+          svc.string(
+            ChildProcess.make("set", ["OPENCODE_TEST_SHELL"], {
+              shell: true,
+              extendEnv: true,
+              env: { OPENCODE_TEST_SHELL: "ok" },
+            }),
+          ),
+        )
+        expect(out).toContain("OPENCODE_TEST_SHELL=ok")
+      }),
+    )
+
+    fx.effect(
+      "runs cmd scripts with spaces on Windows without shell",
+      Effect.gen(function* () {
+        if (process.platform !== "win32") return
+
+        const tmp = yield* Effect.acquireRelease(
+          Effect.promise(() => tmpdir()),
+          (tmp) => Effect.promise(() => tmp[Symbol.asyncDispose]()),
+        )
+        const dir = path.join(tmp.path, "with space")
+        const file = path.join(dir, "echo cmd.cmd")
+
+        yield* Effect.promise(() => fs.mkdir(dir, { recursive: true }))
+        yield* Effect.promise(() => Bun.write(file, "@echo off\r\nif %~1==--stdio exit /b 0\r\nexit /b 7\r\n"))
+
+        const code = yield* ChildProcessSpawner.ChildProcessSpawner.use((svc) =>
+          svc.exitCode(
+            ChildProcess.make(file, ["--stdio"], {
+              stdin: "pipe",
+              stdout: "pipe",
+              stderr: "pipe",
+            }),
+          ),
+        )
+        expect(code).toBe(ChildProcessSpawner.ExitCode(0))
+      }),
+    )
+  })
+})