소스 검색

effectify Bus service: migrate to Effect PubSub + InstanceState (#18579)

Kit Langton 3 주 전
부모
커밋
97c15a087d
42개의 변경된 파일1120개의 추가작업 그리고 529개의 파일을 삭제
  1. 8 6
      packages/opencode/AGENTS.md
  2. 27 22
      packages/opencode/specs/effect-migration.md
  3. 2 2
      packages/opencode/src/account/index.ts
  4. 2 2
      packages/opencode/src/agent/agent.ts
  5. 2 2
      packages/opencode/src/auth/index.ts
  6. 154 75
      packages/opencode/src/bus/index.ts
  7. 2 2
      packages/opencode/src/command/index.ts
  8. 0 14
      packages/opencode/src/effect/instance-context.ts
  9. 8 4
      packages/opencode/src/effect/run-service.ts
  10. 2 2
      packages/opencode/src/file/index.ts
  11. 2 2
      packages/opencode/src/file/time.ts
  12. 2 2
      packages/opencode/src/file/watcher.ts
  13. 48 47
      packages/opencode/src/format/index.ts
  14. 2 2
      packages/opencode/src/git/index.ts
  15. 2 2
      packages/opencode/src/installation/index.ts
  16. 2 2
      packages/opencode/src/permission/index.ts
  17. 19 14
      packages/opencode/src/plugin/index.ts
  18. 2 2
      packages/opencode/src/project/project.ts
  19. 24 19
      packages/opencode/src/project/vcs.ts
  20. 2 2
      packages/opencode/src/provider/auth.ts
  21. 2 2
      packages/opencode/src/pty/index.ts
  22. 2 2
      packages/opencode/src/question/index.ts
  23. 7 4
      packages/opencode/src/session/status.ts
  24. 2 2
      packages/opencode/src/skill/index.ts
  25. 2 2
      packages/opencode/src/snapshot/index.ts
  26. 3 3
      packages/opencode/src/tool/apply_patch.ts
  27. 5 6
      packages/opencode/src/tool/edit.ts
  28. 2 2
      packages/opencode/src/tool/registry.ts
  29. 2 2
      packages/opencode/src/tool/truncate.ts
  30. 3 3
      packages/opencode/src/tool/write.ts
  31. 2 2
      packages/opencode/src/worktree/index.ts
  32. 164 0
      packages/opencode/test/bus/bus-effect.test.ts
  33. 87 0
      packages/opencode/test/bus/bus-integration.test.ts
  34. 219 0
      packages/opencode/test/bus/bus.test.ts
  35. 4 4
      packages/opencode/test/effect/run-service.test.ts
  36. 22 10
      packages/opencode/test/file/watcher.test.ts
  37. 68 0
      packages/opencode/test/fixture/fixture.ts
  38. 0 51
      packages/opencode/test/fixture/instance.ts
  39. 171 161
      packages/opencode/test/format/format.test.ts
  40. 34 38
      packages/opencode/test/project/vcs.test.ts
  41. 7 3
      packages/opencode/test/sync/index.test.ts
  42. 0 7
      packages/opencode/test/tool/edit.test.ts

+ 8 - 6
packages/opencode/AGENTS.md

@@ -31,12 +31,14 @@ See `specs/effect-migration.md` for the compact pattern reference and examples.
 - Use `Schema.Defect` instead of `unknown` for defect-like causes.
 - In `Effect.gen` / `Effect.fn`, prefer `yield* new MyError(...)` over `yield* Effect.fail(new MyError(...))` for direct early-failure branches.
 
-## Runtime vs Instances
+## Runtime vs InstanceState
 
-- Use the shared runtime for process-wide services with one lifecycle for the whole app.
-- Use `src/effect/instances.ts` for per-directory or per-project services that need `InstanceContext`, per-instance state, or per-instance cleanup.
-- If two open directories should not share one copy of the service, it belongs in `Instances`.
-- Instance-scoped services should read context from `InstanceContext`, not `Instance.*` globals.
+- Use `makeRuntime` (from `src/effect/run-service.ts`) for all services. It returns `{ runPromise, runFork, runCallback }` backed by a shared `memoMap` that deduplicates layers.
+- Use `InstanceState` (from `src/effect/instance-state.ts`) for per-directory or per-project state that needs per-instance cleanup. It uses `ScopedCache` keyed by directory — each open project gets its own state, automatically cleaned up on disposal.
+- If two open directories should not share one copy of the service, it needs `InstanceState`.
+- Do the work directly in the `InstanceState.make` closure — `ScopedCache` handles run-once semantics. Don't add fibers, `ensure()` callbacks, or `started` flags on top.
+- Use `Effect.addFinalizer` or `Effect.acquireRelease` inside the `InstanceState.make` closure for cleanup (subscriptions, process teardown, etc.).
+- Use `Effect.forkScoped` inside the closure for background stream consumers — the fiber is interrupted when the instance is disposed.
 
 ## Preferred Effect services
 
@@ -51,7 +53,7 @@ See `specs/effect-migration.md` for the compact pattern reference and examples.
 
 `Instance.bind(fn)` captures the current Instance AsyncLocalStorage context and restores it synchronously when called.
 
-Use it for native addon callbacks (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish`, `Instance.state()`, or anything that reads `Instance.directory`.
+Use it for native addon callbacks (`@parcel/watcher`, `node-pty`, native `fs.watch`, etc.) that need to call `Bus.publish` or anything that reads `Instance.directory`.
 
 You do not need it for `setTimeout`, `Promise.then`, `EventEmitter.on`, or Effect fibers.
 

+ 27 - 22
packages/opencode/specs/effect-migration.md

@@ -6,7 +6,7 @@ Practical reference for new and migrated Effect code in `packages/opencode`.
 
 Use `InstanceState` (from `src/effect/instance-state.ts`) for services that need per-directory state, per-instance cleanup, or project-bound background work. InstanceState uses a `ScopedCache` keyed by directory, so each open project gets its own copy of the state that is automatically cleaned up on disposal.
 
-Use `makeRunPromise` (from `src/effect/run-service.ts`) to create a per-service `ManagedRuntime` that lazily initializes and shares layers via a global `memoMap`.
+Use `makeRuntime` (from `src/effect/run-service.ts`) to create a per-service `ManagedRuntime` that lazily initializes and shares layers via a global `memoMap`. Returns `{ runPromise, runFork, runCallback }`.
 
 - Global services (no per-directory state): Account, Auth, Installation, Truncate
 - Instance-scoped (per-directory state via InstanceState): File, FileTime, FileWatcher, Format, Permission, Question, Skill, Snapshot, Vcs, ProviderAuth
@@ -46,7 +46,7 @@ export namespace Foo {
   export const defaultLayer = layer.pipe(Layer.provide(FooDep.layer))
 
   // Per-service runtime (inside the namespace)
-  const runPromise = makeRunPromise(Service, defaultLayer)
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
   // Async facade functions
   export async function get(id: FooID) {
@@ -79,29 +79,34 @@ See `Auth.ZodInfo` for the canonical example.
 
 The `InstanceState.make` init callback receives a `Scope`, so you can use `Effect.acquireRelease`, `Effect.addFinalizer`, and `Effect.forkScoped` inside it. Resources acquired this way are automatically cleaned up when the instance is disposed or invalidated by `ScopedCache`. This makes it the right place for:
 
-- **Subscriptions**: Use `Effect.acquireRelease` to subscribe and auto-unsubscribe:
+- **Subscriptions**: Yield `Bus.Service` at the layer level, then use `Stream` + `forkScoped` inside the init closure. The fiber is automatically interrupted when the instance scope closes:
 
 ```ts
-const cache =
-  yield *
-  InstanceState.make<State>(
-    Effect.fn("Foo.state")(function* (ctx) {
-      // ... load state ...
-
-      yield* Effect.acquireRelease(
-        Effect.sync(() =>
-          Bus.subscribeAll((event) => {
-            /* handle */
-          }),
-        ),
-        (unsub) => Effect.sync(unsub),
+const bus = yield* Bus.Service
+
+const cache = yield* InstanceState.make<State>(
+  Effect.fn("Foo.state")(function* (ctx) {
+    // ... load state ...
+
+    yield* bus
+      .subscribeAll()
+      .pipe(
+        Stream.runForEach((event) => Effect.sync(() => { /* handle */ })),
+        Effect.forkScoped,
       )
 
-      return {
-        /* state */
-      }
-    }),
-  )
+    return { /* state */ }
+  }),
+)
+```
+
+- **Resource cleanup**: Use `Effect.acquireRelease` or `Effect.addFinalizer` for resources that need teardown (native watchers, process handles, etc.):
+
+```ts
+yield* Effect.acquireRelease(
+  Effect.sync(() => nativeAddon.watch(dir)),
+  (watcher) => Effect.sync(() => watcher.close()),
+)
 ```
 
 - **Background fibers**: Use `Effect.forkScoped` — the fiber is interrupted on disposal.
@@ -165,7 +170,7 @@ Still open and likely worth migrating:
 - [x] `ToolRegistry`
 - [ ] `Pty`
 - [x] `Worktree`
-- [ ] `Bus`
+- [x] `Bus`
 - [x] `Command`
 - [ ] `Config`
 - [ ] `Session`

+ 2 - 2
packages/opencode/src/account/index.ts

@@ -1,7 +1,7 @@
 import { Clock, Duration, Effect, Layer, Option, Schema, SchemaGetter, ServiceMap } from "effect"
 import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
 
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { withTransientReadRetry } from "@/util/effect-http-client"
 import { AccountRepo, type AccountRow } from "./repo"
 import {
@@ -379,7 +379,7 @@ export namespace Account {
 
   export const defaultLayer = layer.pipe(Layer.provide(AccountRepo.layer), Layer.provide(FetchHttpClient.layer))
 
-  export const runPromise = makeRunPromise(Service, defaultLayer)
+  export const { runPromise } = makeRuntime(Service, defaultLayer)
 
   export async function active(): Promise<Info | undefined> {
     return Option.getOrUndefined(await runPromise((service) => service.active()))

+ 2 - 2
packages/opencode/src/agent/agent.ts

@@ -21,7 +21,7 @@ import { Plugin } from "@/plugin"
 import { Skill } from "../skill"
 import { Effect, ServiceMap, Layer } from "effect"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 
 export namespace Agent {
   export const Info = z
@@ -393,7 +393,7 @@ export namespace Agent {
 
   export const defaultLayer = layer.pipe(Layer.provide(Auth.layer))
 
-  const runPromise = makeRunPromise(Service, defaultLayer)
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
   export async function get(agent: string) {
     return runPromise((svc) => svc.get(agent))

+ 2 - 2
packages/opencode/src/auth/index.ts

@@ -1,6 +1,6 @@
 import path from "path"
 import { Effect, Layer, Record, Result, Schema, ServiceMap } from "effect"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { zod } from "@/util/effect-zod"
 import { Global } from "../global"
 import { Filesystem } from "../util/filesystem"
@@ -95,7 +95,7 @@ export namespace Auth {
     }),
   )
 
-  const runPromise = makeRunPromise(Service, layer)
+  const { runPromise } = makeRuntime(Service, layer)
 
   export async function get(providerID: string) {
     return runPromise((service) => service.get(providerID))

+ 154 - 75
packages/opencode/src/bus/index.ts

@@ -1,12 +1,14 @@
 import z from "zod"
+import { Effect, Exit, Layer, PubSub, Scope, ServiceMap, Stream } from "effect"
 import { Log } from "../util/log"
 import { Instance } from "../project/instance"
 import { BusEvent } from "./bus-event"
 import { GlobalBus } from "./global"
+import { InstanceState } from "@/effect/instance-state"
+import { makeRuntime } from "@/effect/run-service"
 
 export namespace Bus {
   const log = Log.create({ service: "bus" })
-  type Subscription = (event: any) => void
 
   export const InstanceDisposed = BusEvent.define(
     "server.instance.disposed",
@@ -15,91 +17,168 @@ export namespace Bus {
     }),
   )
 
-  const state = Instance.state(
-    () => {
-      const subscriptions = new Map<any, Subscription[]>()
+  type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
+    type: D["type"]
+    properties: z.infer<D["properties"]>
+  }
+
+  type State = {
+    wildcard: PubSub.PubSub<Payload>
+    typed: Map<string, PubSub.PubSub<Payload>>
+  }
+
+  export interface Interface {
+    readonly publish: <D extends BusEvent.Definition>(
+      def: D,
+      properties: z.output<D["properties"]>,
+    ) => Effect.Effect<void>
+    readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
+    readonly subscribeAll: () => Stream.Stream<Payload>
+    readonly subscribeCallback: <D extends BusEvent.Definition>(
+      def: D,
+      callback: (event: Payload<D>) => unknown,
+    ) => Effect.Effect<() => void>
+    readonly subscribeAllCallback: (callback: (event: any) => unknown) => Effect.Effect<() => void>
+  }
+
+  export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Bus") {}
+
+  export const layer = Layer.effect(
+    Service,
+    Effect.gen(function* () {
+      const cache = yield* InstanceState.make<State>(
+        Effect.fn("Bus.state")(function* (ctx) {
+          const wildcard = yield* PubSub.unbounded<Payload>()
+          const typed = new Map<string, PubSub.PubSub<Payload>>()
+
+          yield* Effect.addFinalizer(() =>
+            Effect.gen(function* () {
+              // Publish InstanceDisposed before shutting down so subscribers see it
+              yield* PubSub.publish(wildcard, {
+                type: InstanceDisposed.type,
+                properties: { directory: ctx.directory },
+              })
+              yield* PubSub.shutdown(wildcard)
+              for (const ps of typed.values()) {
+                yield* PubSub.shutdown(ps)
+              }
+            }),
+          )
 
-      return {
-        subscriptions,
+          return { wildcard, typed }
+        }),
+      )
+
+      function getOrCreate<D extends BusEvent.Definition>(state: State, def: D) {
+        return Effect.gen(function* () {
+          let ps = state.typed.get(def.type)
+          if (!ps) {
+            ps = yield* PubSub.unbounded<Payload>()
+            state.typed.set(def.type, ps)
+          }
+          return ps as unknown as PubSub.PubSub<Payload<D>>
+        })
       }
-    },
-    async (entry) => {
-      const wildcard = entry.subscriptions.get("*")
-      if (!wildcard) return
-      const event = {
-        type: InstanceDisposed.type,
-        properties: {
-          directory: Instance.directory,
-        },
+
+      function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
+        return Effect.gen(function* () {
+          const state = yield* InstanceState.get(cache)
+          const payload: Payload = { type: def.type, properties }
+          log.info("publishing", { type: def.type })
+
+          const ps = state.typed.get(def.type)
+          if (ps) yield* PubSub.publish(ps, payload)
+          yield* PubSub.publish(state.wildcard, payload)
+
+          GlobalBus.emit("event", {
+            directory: Instance.directory,
+            payload,
+          })
+        })
       }
-      for (const sub of [...wildcard]) {
-        sub(event)
+
+      function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
+        log.info("subscribing", { type: def.type })
+        return Stream.unwrap(
+          Effect.gen(function* () {
+            const state = yield* InstanceState.get(cache)
+            const ps = yield* getOrCreate(state, def)
+            return Stream.fromPubSub(ps)
+          }),
+        ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
       }
-    },
-  )
 
-  export async function publish<Definition extends BusEvent.Definition>(
-    def: Definition,
-    properties: z.output<Definition["properties"]>,
-  ) {
-    const payload = {
-      type: def.type,
-      properties,
-    }
-    log.info("publishing", {
-      type: def.type,
-    })
-    const pending = []
-    for (const key of [def.type, "*"]) {
-      const match = [...(state().subscriptions.get(key) ?? [])]
-      for (const sub of match) {
-        pending.push(sub(payload))
+      function subscribeAll(): Stream.Stream<Payload> {
+        log.info("subscribing", { type: "*" })
+        return Stream.unwrap(
+          Effect.gen(function* () {
+            const state = yield* InstanceState.get(cache)
+            return Stream.fromPubSub(state.wildcard)
+          }),
+        ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))))
       }
-    }
-    GlobalBus.emit("event", {
-      directory: Instance.directory,
-      payload,
-    })
-    return Promise.all(pending)
-  }
 
-  export function subscribe<Definition extends BusEvent.Definition>(
-    def: Definition,
-    callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
-  ) {
-    return raw(def.type, callback)
-  }
+      function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
+        return Effect.gen(function* () {
+          log.info("subscribing", { type })
+          const scope = yield* Scope.make()
+          const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
 
-  export function once<Definition extends BusEvent.Definition>(
-    def: Definition,
-    callback: (event: {
-      type: Definition["type"]
-      properties: z.infer<Definition["properties"]>
-    }) => "done" | undefined,
-  ) {
-    const unsub = subscribe(def, (event) => {
-      if (callback(event)) unsub()
-    })
+          yield* Scope.provide(scope)(
+            Stream.fromSubscription(subscription).pipe(
+              Stream.runForEach((msg) =>
+                Effect.tryPromise({
+                  try: () => Promise.resolve().then(() => callback(msg)),
+                  catch: (cause) => {
+                    log.error("subscriber failed", { type, cause })
+                  },
+                }).pipe(Effect.ignore),
+              ),
+              Effect.forkScoped,
+            ),
+          )
+
+          return () => {
+            log.info("unsubscribing", { type })
+            Effect.runFork(Scope.close(scope, Exit.void))
+          }
+        })
+      }
+
+      const subscribeCallback = Effect.fn("Bus.subscribeCallback")(function* <D extends BusEvent.Definition>(
+        def: D,
+        callback: (event: Payload<D>) => unknown,
+      ) {
+        const state = yield* InstanceState.get(cache)
+        const ps = yield* getOrCreate(state, def)
+        return yield* on(ps, def.type, callback)
+      })
+
+      const subscribeAllCallback = Effect.fn("Bus.subscribeAllCallback")(function* (callback: (event: any) => unknown) {
+        const state = yield* InstanceState.get(cache)
+        return yield* on(state.wildcard, "*", callback)
+      })
+
+      return Service.of({ publish, subscribe, subscribeAll, subscribeCallback, subscribeAllCallback })
+    }),
+  )
+
+  const { runPromise, runSync } = makeRuntime(Service, layer)
+
+  // runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe,
+  // Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw.
+  export async function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
+    return runPromise((svc) => svc.publish(def, properties))
   }
 
-  export function subscribeAll(callback: (event: any) => void) {
-    return raw("*", callback)
+  export function subscribe<D extends BusEvent.Definition>(
+    def: D,
+    callback: (event: { type: D["type"]; properties: z.infer<D["properties"]> }) => unknown,
+  ) {
+    return runSync((svc) => svc.subscribeCallback(def, callback))
   }
 
-  function raw(type: string, callback: (event: any) => void) {
-    log.info("subscribing", { type })
-    const subscriptions = state().subscriptions
-    let match = subscriptions.get(type) ?? []
-    match.push(callback)
-    subscriptions.set(type, match)
-
-    return () => {
-      log.info("unsubscribing", { type })
-      const match = subscriptions.get(type)
-      if (!match) return
-      const index = match.indexOf(callback)
-      if (index === -1) return
-      match.splice(index, 1)
-    }
+  export function subscribeAll(callback: (event: any) => unknown) {
+    return runSync((svc) => svc.subscribeAllCallback(callback))
   }
 }

+ 2 - 2
packages/opencode/src/command/index.ts

@@ -1,6 +1,6 @@
 import { BusEvent } from "@/bus/bus-event"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { SessionID, MessageID } from "@/session/schema"
 import { Effect, Layer, ServiceMap } from "effect"
 import z from "zod"
@@ -173,7 +173,7 @@ export namespace Command {
     }),
   )
 
-  const runPromise = makeRunPromise(Service, layer)
+  const { runPromise } = makeRuntime(Service, layer)
 
   export async function get(name: string) {
     return runPromise((svc) => svc.get(name))

+ 0 - 14
packages/opencode/src/effect/instance-context.ts

@@ -1,14 +0,0 @@
-import { ServiceMap } from "effect"
-import type { Project } from "@/project/project"
-
-export declare namespace InstanceContext {
-  export interface Shape {
-    readonly directory: string
-    readonly worktree: string
-    readonly project: Project.Info
-  }
-}
-
-export class InstanceContext extends ServiceMap.Service<InstanceContext, InstanceContext.Shape>()(
-  "opencode/InstanceContext",
-) {}

+ 8 - 4
packages/opencode/src/effect/run-service.ts

@@ -3,11 +3,15 @@ import * as ServiceMap from "effect/ServiceMap"
 
 export const memoMap = Layer.makeMemoMapUnsafe()
 
-export function makeRunPromise<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
+export function makeRuntime<I, S, E>(service: ServiceMap.Service<I, S>, layer: Layer.Layer<I, E>) {
   let rt: ManagedRuntime.ManagedRuntime<I, E> | undefined
+  const getRuntime = () => (rt ??= ManagedRuntime.make(layer, { memoMap }))
 
-  return <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) => {
-    rt ??= ManagedRuntime.make(layer, { memoMap })
-    return rt.runPromise(service.use(fn), options)
+  return {
+    runSync: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runSync(service.use(fn)),
+    runPromise: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>, options?: Effect.RunOptions) =>
+      getRuntime().runPromise(service.use(fn), options),
+    runFork: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runFork(service.use(fn)),
+    runCallback: <A, Err>(fn: (svc: S) => Effect.Effect<A, Err, I>) => getRuntime().runCallback(service.use(fn)),
   }
 }

+ 2 - 2
packages/opencode/src/file/index.ts

@@ -1,6 +1,6 @@
 import { BusEvent } from "@/bus/bus-event"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { Git } from "@/git"
 import { Effect, Fiber, Layer, Scope, ServiceMap } from "effect"
 import { formatPatch, structuredPatch } from "diff"
@@ -688,7 +688,7 @@ export namespace File {
     }),
   )
 
-  const runPromise = makeRunPromise(Service, layer)
+  const { runPromise } = makeRuntime(Service, layer)
 
   export function init() {
     return runPromise((svc) => svc.init())

+ 2 - 2
packages/opencode/src/file/time.ts

@@ -1,6 +1,6 @@
 import { DateTime, Effect, Layer, Semaphore, ServiceMap } from "effect"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { Flag } from "@/flag/flag"
 import type { SessionID } from "@/session/schema"
 import { Filesystem } from "../util/filesystem"
@@ -108,7 +108,7 @@ export namespace FileTime {
     }),
   ).pipe(Layer.orDie)
 
-  const runPromise = makeRunPromise(Service, layer)
+  const { runPromise } = makeRuntime(Service, layer)
 
   export function read(sessionID: SessionID, file: string) {
     return runPromise((s) => s.read(sessionID, file))

+ 2 - 2
packages/opencode/src/file/watcher.ts

@@ -8,7 +8,7 @@ import z from "zod"
 import { Bus } from "@/bus"
 import { BusEvent } from "@/bus/bus-event"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { Flag } from "@/flag/flag"
 import { Git } from "@/git"
 import { Instance } from "@/project/instance"
@@ -159,7 +159,7 @@ export namespace FileWatcher {
     }),
   )
 
-  const runPromise = makeRunPromise(Service, layer)
+  const { runPromise } = makeRuntime(Service, layer)
 
   export function init() {
     return runPromise((svc) => svc.init())

+ 48 - 47
packages/opencode/src/format/index.ts

@@ -1,12 +1,10 @@
 import { Effect, Layer, ServiceMap } from "effect"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import path from "path"
 import { mergeDeep } from "remeda"
 import z from "zod"
-import { Bus } from "../bus"
 import { Config } from "../config/config"
-import { File } from "../file"
 import { Instance } from "../project/instance"
 import { Process } from "../util/process"
 import { Log } from "../util/log"
@@ -29,6 +27,7 @@ export namespace Format {
   export interface Interface {
     readonly init: () => Effect.Effect<void>
     readonly status: () => Effect.Effect<Status[]>
+    readonly file: (filepath: string) => Effect.Effect<void>
   }
 
   export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Format") {}
@@ -97,53 +96,46 @@ export namespace Format {
             return checks.filter((x) => x.enabled).map((x) => x.item)
           }
 
-          yield* Effect.acquireRelease(
-            Effect.sync(() =>
-              Bus.subscribe(
-                File.Event.Edited,
-                Instance.bind(async (payload) => {
-                  const file = payload.properties.file
-                  log.info("formatting", { file })
-                  const ext = path.extname(file)
-
-                  for (const item of await getFormatter(ext)) {
-                    log.info("running", { command: item.command })
-                    try {
-                      const proc = Process.spawn(
-                        item.command.map((x) => x.replace("$FILE", file)),
-                        {
-                          cwd: Instance.directory,
-                          env: { ...process.env, ...item.environment },
-                          stdout: "ignore",
-                          stderr: "ignore",
-                        },
-                      )
-                      const exit = await proc.exited
-                      if (exit !== 0) {
-                        log.error("failed", {
-                          command: item.command,
-                          ...item.environment,
-                        })
-                      }
-                    } catch (error) {
-                      log.error("failed to format file", {
-                        error,
-                        command: item.command,
-                        ...item.environment,
-                        file,
-                      })
-                    }
-                  }
-                }),
-              ),
-            ),
-            (unsubscribe) => Effect.sync(unsubscribe),
-          )
+          async function formatFile(filepath: string) {
+            log.info("formatting", { file: filepath })
+            const ext = path.extname(filepath)
+
+            for (const item of await getFormatter(ext)) {
+              log.info("running", { command: item.command })
+              try {
+                const proc = Process.spawn(
+                  item.command.map((x) => x.replace("$FILE", filepath)),
+                  {
+                    cwd: Instance.directory,
+                    env: { ...process.env, ...item.environment },
+                    stdout: "ignore",
+                    stderr: "ignore",
+                  },
+                )
+                const exit = await proc.exited
+                if (exit !== 0) {
+                  log.error("failed", {
+                    command: item.command,
+                    ...item.environment,
+                  })
+                }
+              } catch (error) {
+                log.error("failed to format file", {
+                  error,
+                  command: item.command,
+                  ...item.environment,
+                  file: filepath,
+                })
+              }
+            }
+          }
+
           log.info("init")
 
           return {
             formatters,
             isEnabled,
+            formatFile,
           }
         }),
       )
@@ -166,11 +158,16 @@ export namespace Format {
         return result
       })
 
-      return Service.of({ init, status })
+      const file = Effect.fn("Format.file")(function* (filepath: string) {
+        const { formatFile } = yield* InstanceState.get(state)
+        yield* Effect.promise(() => formatFile(filepath))
+      })
+
+      return Service.of({ init, status, file })
     }),
   )
 
-  const runPromise = makeRunPromise(Service, layer)
+  const { runPromise } = makeRuntime(Service, layer)
 
   export async function init() {
     return runPromise((s) => s.init())
@@ -179,4 +176,8 @@ export namespace Format {
   export async function status() {
     return runPromise((s) => s.status())
   }
+
+  export async function file(filepath: string) {
+    return runPromise((s) => s.file(filepath))
+  }
 }

+ 2 - 2
packages/opencode/src/git/index.ts

@@ -2,7 +2,7 @@ import { NodeFileSystem, NodePath } from "@effect/platform-node"
 import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
 import { Effect, Layer, ServiceMap, Stream } from "effect"
 import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 
 export namespace Git {
   const cfg = [
@@ -264,7 +264,7 @@ export namespace Git {
     Layer.provide(NodePath.layer),
   )
 
-  const runPromise = makeRunPromise(Service, defaultLayer)
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
   export function run(args: string[], opts: Options) {
     return runPromise((git) => git.run(args, opts))

+ 2 - 2
packages/opencode/src/installation/index.ts

@@ -2,7 +2,7 @@ import { NodeFileSystem, NodePath } from "@effect/platform-node"
 import { Effect, Layer, Schema, ServiceMap, Stream } from "effect"
 import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
 import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { withTransientReadRetry } from "@/util/effect-http-client"
 import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
 import path from "path"
@@ -346,7 +346,7 @@ export namespace Installation {
     Layer.provide(NodePath.layer),
   )
 
-  const runPromise = makeRunPromise(Service, defaultLayer)
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
   export async function info(): Promise<Info> {
     return runPromise((svc) => svc.info())

+ 2 - 2
packages/opencode/src/permission/index.ts

@@ -2,7 +2,7 @@ import { Bus } from "@/bus"
 import { BusEvent } from "@/bus/bus-event"
 import { Config } from "@/config/config"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { ProjectID } from "@/project/schema"
 import { Instance } from "@/project/instance"
 import { MessageID, SessionID } from "@/session/schema"
@@ -306,7 +306,7 @@ export namespace Permission {
     return result
   }
 
-  export const runPromise = makeRunPromise(Service, layer)
+  export const { runPromise } = makeRuntime(Service, layer)
 
   export async function ask(input: z.infer<typeof AskInput>) {
     return runPromise((s) => s.ask(input))

+ 19 - 14
packages/opencode/src/plugin/index.ts

@@ -11,9 +11,9 @@ import { NamedError } from "@opencode-ai/util/error"
 import { CopilotAuthPlugin } from "./copilot"
 import { gitlabAuthPlugin as GitlabAuthPlugin } from "opencode-gitlab-auth"
 import { PoeAuthPlugin } from "opencode-poe-auth"
-import { Effect, Layer, ServiceMap } from "effect"
+import { Effect, Layer, ServiceMap, Stream } from "effect"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 
 export namespace Plugin {
   const log = Log.create({ service: "plugin" })
@@ -52,6 +52,8 @@ export namespace Plugin {
   export const layer = Layer.effect(
     Service,
     Effect.gen(function* () {
+      const bus = yield* Bus.Service
+
       const cache = yield* InstanceState.make<State>(
         Effect.fn("Plugin.state")(function* (ctx) {
           const hooks: Hooks[] = []
@@ -146,17 +148,19 @@ export namespace Plugin {
             }
           })
 
-          // Subscribe to bus events, clean up when scope is closed
-          yield* Effect.acquireRelease(
-            Effect.sync(() =>
-              Bus.subscribeAll(async (input) => {
-                for (const hook of hooks) {
-                  hook["event"]?.({ event: input })
-                }
-              }),
-            ),
-            (unsub) => Effect.sync(unsub),
-          )
+          // Subscribe to bus events, fiber interrupted when scope closes
+          yield* bus
+            .subscribeAll()
+            .pipe(
+              Stream.runForEach((input) =>
+                Effect.sync(() => {
+                  for (const hook of hooks) {
+                    hook["event"]?.({ event: input as any })
+                  }
+                }),
+              ),
+              Effect.forkScoped,
+            )
 
           return { hooks }
         }),
@@ -192,7 +196,8 @@ export namespace Plugin {
     }),
   )
 
-  const runPromise = makeRunPromise(Service, layer)
+  const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
   export async function trigger<
     Name extends TriggerName,

+ 2 - 2
packages/opencode/src/project/project.ts

@@ -11,7 +11,7 @@ import { ProjectID } from "./schema"
 import { Effect, Layer, Path, Scope, ServiceMap, Stream } from "effect"
 import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
 import { NodeFileSystem, NodePath } from "@effect/platform-node"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { AppFileSystem } from "@/filesystem"
 import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
 
@@ -462,7 +462,7 @@ export namespace Project {
     Layer.provide(NodeFileSystem.layer),
     Layer.provide(NodePath.layer),
   )
-  const runPromise = makeRunPromise(Service, defaultLayer)
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
   // ---------------------------------------------------------------------------
   // Promise-based API (delegates to Effect service via runPromise)

+ 24 - 19
packages/opencode/src/project/vcs.ts

@@ -1,9 +1,9 @@
-import { Effect, Layer, ServiceMap } from "effect"
+import { Effect, Layer, ServiceMap, Stream } from "effect"
 import path from "path"
 import { Bus } from "@/bus"
 import { BusEvent } from "@/bus/bus-event"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { AppFileSystem } from "@/filesystem"
 import { FileWatcher } from "@/file/watcher"
 import { Git } from "@/git"
@@ -139,11 +139,12 @@ export namespace Vcs {
 
   export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/Vcs") {}
 
-  export const layer: Layer.Layer<Service, never, AppFileSystem.Service | Git.Service> = Layer.effect(
+  export const layer: Layer.Layer<Service, never, AppFileSystem.Service | Git.Service | Bus.Service> = Layer.effect(
     Service,
     Effect.gen(function* () {
       const fs = yield* AppFileSystem.Service
       const git = yield* Git.Service
+      const bus = yield* Bus.Service
       const state = yield* InstanceState.make<State>(
         Effect.fn("Vcs.state")((ctx) =>
           Effect.gen(function* () {
@@ -158,22 +159,22 @@ export namespace Vcs {
             const value = { current, root }
             log.info("initialized", { branch: value.current, default_branch: value.root?.name })
 
-            yield* Effect.acquireRelease(
-              Effect.sync(() =>
-                Bus.subscribe(
-                  FileWatcher.Event.Updated,
-                  Instance.bind(async (evt) => {
-                    if (!evt.properties.file.endsWith("HEAD")) return
-                    const next = await get()
-                    if (next === value.current) return
-                    log.info("branch changed", { from: value.current, to: next })
-                    value.current = next
-                    Bus.publish(Event.BranchUpdated, { branch: next })
+            yield* bus
+              .subscribe(FileWatcher.Event.Updated)
+              .pipe(
+                Stream.filter((evt) => evt.properties.file.endsWith("HEAD")),
+                Stream.runForEach((_evt) =>
+                  Effect.gen(function* () {
+                    const next = yield* Effect.promise(() => get())
+                    if (next !== value.current) {
+                      log.info("branch changed", { from: value.current, to: next })
+                      value.current = next
+                      yield* bus.publish(Event.BranchUpdated, { branch: next })
+                    }
                   }),
                 ),
-              ),
-              (unsubscribe) => Effect.sync(unsubscribe),
-            )
+                Effect.forkScoped,
+              )
 
             return value
           }),
@@ -212,9 +213,13 @@ export namespace Vcs {
     }),
   )
 
-  export const defaultLayer = layer.pipe(Layer.provide(Git.defaultLayer), Layer.provide(AppFileSystem.defaultLayer))
+  export const defaultLayer = layer.pipe(
+    Layer.provide(Git.defaultLayer),
+    Layer.provide(AppFileSystem.defaultLayer),
+    Layer.provide(Bus.layer),
+  )
 
-  const runPromise = makeRunPromise(Service, defaultLayer)
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
   export function init() {
     return runPromise((svc) => svc.init())

+ 2 - 2
packages/opencode/src/provider/auth.ts

@@ -2,7 +2,7 @@ import type { AuthOuathResult, Hooks } from "@opencode-ai/plugin"
 import { NamedError } from "@opencode-ai/util/error"
 import { Auth } from "@/auth"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { Plugin } from "../plugin"
 import { ProviderID } from "./schema"
 import { Array as Arr, Effect, Layer, Record, Result, ServiceMap } from "effect"
@@ -231,7 +231,7 @@ export namespace ProviderAuth {
 
   export const defaultLayer = layer.pipe(Layer.provide(Auth.layer))
 
-  const runPromise = makeRunPromise(Service, defaultLayer)
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
   export async function methods() {
     return runPromise((svc) => svc.methods())

+ 2 - 2
packages/opencode/src/pty/index.ts

@@ -1,7 +1,7 @@
 import { BusEvent } from "@/bus/bus-event"
 import { Bus } from "@/bus"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { Instance } from "@/project/instance"
 import { type IPty } from "bun-pty"
 import z from "zod"
@@ -361,7 +361,7 @@ export namespace Pty {
     }),
   )
 
-  const runPromise = makeRunPromise(Service, layer)
+  const { runPromise } = makeRuntime(Service, layer)
 
   export async function list() {
     return runPromise((svc) => svc.list())

+ 2 - 2
packages/opencode/src/question/index.ts

@@ -2,7 +2,7 @@ import { Deferred, Effect, Layer, Schema, ServiceMap } from "effect"
 import { Bus } from "@/bus"
 import { BusEvent } from "@/bus/bus-event"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { SessionID, MessageID } from "@/session/schema"
 import { Log } from "@/util/log"
 import z from "zod"
@@ -197,7 +197,7 @@ export namespace Question {
     }),
   )
 
-  const runPromise = makeRunPromise(Service, layer)
+  const { runPromise } = makeRuntime(Service, layer)
 
   export async function ask(input: {
     sessionID: SessionID

+ 7 - 4
packages/opencode/src/session/status.ts

@@ -1,7 +1,7 @@
 import { BusEvent } from "@/bus/bus-event"
 import { Bus } from "@/bus"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { SessionID } from "./schema"
 import { Effect, Layer, ServiceMap } from "effect"
 import z from "zod"
@@ -55,6 +55,8 @@ export namespace SessionStatus {
   export const layer = Layer.effect(
     Service,
     Effect.gen(function* () {
+      const bus = yield* Bus.Service
+
       const state = yield* InstanceState.make(
         Effect.fn("SessionStatus.state")(() => Effect.succeed(new Map<SessionID, Info>())),
       )
@@ -70,9 +72,9 @@ export namespace SessionStatus {
 
       const set = Effect.fn("SessionStatus.set")(function* (sessionID: SessionID, status: Info) {
         const data = yield* InstanceState.get(state)
-        yield* Effect.promise(() => Bus.publish(Event.Status, { sessionID, status }))
+        yield* bus.publish(Event.Status, { sessionID, status })
         if (status.type === "idle") {
-          yield* Effect.promise(() => Bus.publish(Event.Idle, { sessionID }))
+          yield* bus.publish(Event.Idle, { sessionID })
           data.delete(sessionID)
           return
         }
@@ -83,7 +85,8 @@ export namespace SessionStatus {
     }),
   )
 
-  const runPromise = makeRunPromise(Service, layer)
+  const defaultLayer = layer.pipe(Layer.provide(Bus.layer))
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
   export async function get(sessionID: SessionID) {
     return runPromise((svc) => svc.get(sessionID))

+ 2 - 2
packages/opencode/src/skill/index.ts

@@ -7,7 +7,7 @@ import { NamedError } from "@opencode-ai/util/error"
 import type { Agent } from "@/agent/agent"
 import { Bus } from "@/bus"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { Flag } from "@/flag/flag"
 import { Global } from "@/global"
 import { Permission } from "@/permission"
@@ -242,7 +242,7 @@ export namespace Skill {
     return ["## Available Skills", ...list.map((skill) => `- **${skill.name}**: ${skill.description}`)].join("\n")
   }
 
-  const runPromise = makeRunPromise(Service, defaultLayer)
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
   export async function get(name: string) {
     return runPromise((skill) => skill.get(name))

+ 2 - 2
packages/opencode/src/snapshot/index.ts

@@ -5,7 +5,7 @@ import path from "path"
 import z from "zod"
 import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { AppFileSystem } from "@/filesystem"
 import { Hash } from "@/util/hash"
 import { Config } from "../config/config"
@@ -459,7 +459,7 @@ export namespace Snapshot {
     Layer.provide(NodePath.layer),
   )
 
-  const runPromise = makeRunPromise(Service, defaultLayer)
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
   export async function init() {
     return runPromise((svc) => svc.init())

+ 3 - 3
packages/opencode/src/tool/apply_patch.ts

@@ -13,6 +13,7 @@ import { LSP } from "../lsp"
 import { Filesystem } from "../util/filesystem"
 import DESCRIPTION from "./apply_patch.txt"
 import { File } from "../file"
+import { Format } from "../format"
 
 const PatchParams = z.object({
   patchText: z.string().describe("The full patch text that describes all changes to be made"),
@@ -220,9 +221,8 @@ export const ApplyPatchTool = Tool.define("apply_patch", {
       }
 
       if (edited) {
-        await Bus.publish(File.Event.Edited, {
-          file: edited,
-        })
+        await Format.file(edited)
+        Bus.publish(File.Event.Edited, { file: edited })
       }
     }
 

+ 5 - 6
packages/opencode/src/tool/edit.ts

@@ -12,6 +12,7 @@ import DESCRIPTION from "./edit.txt"
 import { File } from "../file"
 import { FileWatcher } from "../file/watcher"
 import { Bus } from "../bus"
+import { Format } from "../format"
 import { FileTime } from "../file/time"
 import { Filesystem } from "../util/filesystem"
 import { Instance } from "../project/instance"
@@ -71,9 +72,8 @@ export const EditTool = Tool.define("edit", {
           },
         })
         await Filesystem.write(filePath, params.newString)
-        await Bus.publish(File.Event.Edited, {
-          file: filePath,
-        })
+        await Format.file(filePath)
+        Bus.publish(File.Event.Edited, { file: filePath })
         await Bus.publish(FileWatcher.Event.Updated, {
           file: filePath,
           event: existed ? "change" : "add",
@@ -108,9 +108,8 @@ export const EditTool = Tool.define("edit", {
       })
 
       await Filesystem.write(filePath, contentNew)
-      await Bus.publish(File.Event.Edited, {
-        file: filePath,
-      })
+      await Format.file(filePath)
+      Bus.publish(File.Event.Edited, { file: filePath })
       await Bus.publish(FileWatcher.Event.Updated, {
         file: filePath,
         event: "change",

+ 2 - 2
packages/opencode/src/tool/registry.ts

@@ -31,7 +31,7 @@ import { Glob } from "../util/glob"
 import { pathToFileURL } from "url"
 import { Effect, Layer, ServiceMap } from "effect"
 import { InstanceState } from "@/effect/instance-state"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 
 export namespace ToolRegistry {
   const log = Log.create({ service: "tool.registry" })
@@ -198,7 +198,7 @@ export namespace ToolRegistry {
     }),
   )
 
-  const runPromise = makeRunPromise(Service, layer)
+  const { runPromise } = makeRuntime(Service, layer)
 
   export async function register(tool: Tool.Info) {
     return runPromise((svc) => svc.register(tool))

+ 2 - 2
packages/opencode/src/tool/truncate.ts

@@ -2,7 +2,7 @@ import { NodePath } from "@effect/platform-node"
 import { Cause, Duration, Effect, Layer, Schedule, ServiceMap } from "effect"
 import path from "path"
 import type { Agent } from "../agent/agent"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import { AppFileSystem } from "@/filesystem"
 import { evaluate } from "@/permission/evaluate"
 import { Identifier } from "../id/id"
@@ -136,7 +136,7 @@ export namespace Truncate {
 
   export const defaultLayer = layer.pipe(Layer.provide(AppFileSystem.defaultLayer), Layer.provide(NodePath.layer))
 
-  const runPromise = makeRunPromise(Service, defaultLayer)
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
   export async function output(text: string, options: Options = {}, agent?: Agent.Info): Promise<Result> {
     return runPromise((s) => s.output(text, options, agent))

+ 3 - 3
packages/opencode/src/tool/write.ts

@@ -7,6 +7,7 @@ import DESCRIPTION from "./write.txt"
 import { Bus } from "../bus"
 import { File } from "../file"
 import { FileWatcher } from "../file/watcher"
+import { Format } from "../format"
 import { FileTime } from "../file/time"
 import { Filesystem } from "../util/filesystem"
 import { Instance } from "../project/instance"
@@ -42,9 +43,8 @@ export const WriteTool = Tool.define("write", {
     })
 
     await Filesystem.write(filepath, params.content)
-    await Bus.publish(File.Event.Edited, {
-      file: filepath,
-    })
+    await Format.file(filepath)
+    Bus.publish(File.Event.Edited, { file: filepath })
     await Bus.publish(FileWatcher.Event.Updated, {
       file: filepath,
       event: exists ? "change" : "add",

+ 2 - 2
packages/opencode/src/worktree/index.ts

@@ -15,7 +15,7 @@ import { Git } from "@/git"
 import { Effect, FileSystem, Layer, Path, Scope, ServiceMap, Stream } from "effect"
 import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
 import { NodeFileSystem, NodePath } from "@effect/platform-node"
-import { makeRunPromise } from "@/effect/run-service"
+import { makeRuntime } from "@/effect/run-service"
 import * as CrossSpawnSpawner from "@/effect/cross-spawn-spawner"
 
 export namespace Worktree {
@@ -576,7 +576,7 @@ export namespace Worktree {
     Layer.provide(NodeFileSystem.layer),
     Layer.provide(NodePath.layer),
   )
-  const runPromise = makeRunPromise(Service, defaultLayer)
+  const { runPromise } = makeRuntime(Service, defaultLayer)
 
   export async function makeWorktreeInfo(name?: string) {
     return runPromise((svc) => svc.makeWorktreeInfo(name))

+ 164 - 0
packages/opencode/test/bus/bus-effect.test.ts

@@ -0,0 +1,164 @@
+import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
+import { describe, expect } from "bun:test"
+import { Deferred, Effect, Layer, Stream } from "effect"
+import z from "zod"
+import { Bus } from "../../src/bus"
+import { BusEvent } from "../../src/bus/bus-event"
+import { Instance } from "../../src/project/instance"
+import { provideInstance, provideTmpdirInstance, tmpdirScoped } from "../fixture/fixture"
+import { testEffect } from "../lib/effect"
+
+const TestEvent = {
+  Ping: BusEvent.define("test.effect.ping", z.object({ value: z.number() })),
+  Pong: BusEvent.define("test.effect.pong", z.object({ message: z.string() })),
+}
+
+const node = NodeChildProcessSpawner.layer.pipe(
+  Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)),
+)
+
+const live = Layer.mergeAll(Bus.layer, node)
+
+const it = testEffect(live)
+
+describe("Bus (Effect-native)", () => {
+  it.effect("publish + subscribe stream delivers events", () =>
+    provideTmpdirInstance(() =>
+      Effect.gen(function* () {
+        const bus = yield* Bus.Service
+        const received: number[] = []
+        const done = yield* Deferred.make<void>()
+
+        yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
+          Effect.sync(() => {
+            received.push(evt.properties.value)
+            if (received.length === 2) Deferred.doneUnsafe(done, Effect.void)
+          }),
+        ).pipe(Effect.forkScoped)
+
+        yield* Effect.sleep("10 millis")
+        yield* bus.publish(TestEvent.Ping, { value: 1 })
+        yield* bus.publish(TestEvent.Ping, { value: 2 })
+        yield* Deferred.await(done)
+
+        expect(received).toEqual([1, 2])
+      }),
+    ),
+  )
+
+  it.effect("subscribe filters by event type", () =>
+    provideTmpdirInstance(() =>
+      Effect.gen(function* () {
+        const bus = yield* Bus.Service
+        const pings: number[] = []
+        const done = yield* Deferred.make<void>()
+
+        yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
+          Effect.sync(() => {
+            pings.push(evt.properties.value)
+            Deferred.doneUnsafe(done, Effect.void)
+          }),
+        ).pipe(Effect.forkScoped)
+
+        yield* Effect.sleep("10 millis")
+        yield* bus.publish(TestEvent.Pong, { message: "ignored" })
+        yield* bus.publish(TestEvent.Ping, { value: 42 })
+        yield* Deferred.await(done)
+
+        expect(pings).toEqual([42])
+      }),
+    ),
+  )
+
+  it.effect("subscribeAll receives all types", () =>
+    provideTmpdirInstance(() =>
+      Effect.gen(function* () {
+        const bus = yield* Bus.Service
+        const types: string[] = []
+        const done = yield* Deferred.make<void>()
+
+        yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
+          Effect.sync(() => {
+            types.push(evt.type)
+            if (types.length === 2) Deferred.doneUnsafe(done, Effect.void)
+          }),
+        ).pipe(Effect.forkScoped)
+
+        yield* Effect.sleep("10 millis")
+        yield* bus.publish(TestEvent.Ping, { value: 1 })
+        yield* bus.publish(TestEvent.Pong, { message: "hi" })
+        yield* Deferred.await(done)
+
+        expect(types).toContain("test.effect.ping")
+        expect(types).toContain("test.effect.pong")
+      }),
+    ),
+  )
+
+  it.effect("multiple subscribers each receive the event", () =>
+    provideTmpdirInstance(() =>
+      Effect.gen(function* () {
+        const bus = yield* Bus.Service
+        const a: number[] = []
+        const b: number[] = []
+        const doneA = yield* Deferred.make<void>()
+        const doneB = yield* Deferred.make<void>()
+
+        yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
+          Effect.sync(() => {
+            a.push(evt.properties.value)
+            Deferred.doneUnsafe(doneA, Effect.void)
+          }),
+        ).pipe(Effect.forkScoped)
+
+        yield* Stream.runForEach(bus.subscribe(TestEvent.Ping), (evt) =>
+          Effect.sync(() => {
+            b.push(evt.properties.value)
+            Deferred.doneUnsafe(doneB, Effect.void)
+          }),
+        ).pipe(Effect.forkScoped)
+
+        yield* Effect.sleep("10 millis")
+        yield* bus.publish(TestEvent.Ping, { value: 99 })
+        yield* Deferred.await(doneA)
+        yield* Deferred.await(doneB)
+
+        expect(a).toEqual([99])
+        expect(b).toEqual([99])
+      }),
+    ),
+  )
+
+  it.effect("subscribeAll stream sees InstanceDisposed on disposal", () =>
+    Effect.gen(function* () {
+      const dir = yield* tmpdirScoped()
+      const types: string[] = []
+      const seen = yield* Deferred.make<void>()
+      const disposed = yield* Deferred.make<void>()
+
+      // Set up subscriber inside the instance
+      yield* Effect.gen(function* () {
+        const bus = yield* Bus.Service
+
+        yield* Stream.runForEach(bus.subscribeAll(), (evt) =>
+          Effect.sync(() => {
+            types.push(evt.type)
+            if (evt.type === TestEvent.Ping.type) Deferred.doneUnsafe(seen, Effect.void)
+            if (evt.type === Bus.InstanceDisposed.type) Deferred.doneUnsafe(disposed, Effect.void)
+          }),
+        ).pipe(Effect.forkScoped)
+
+        yield* Effect.sleep("10 millis")
+        yield* bus.publish(TestEvent.Ping, { value: 1 })
+        yield* Deferred.await(seen)
+      }).pipe(provideInstance(dir))
+
+      // Dispose from OUTSIDE the instance scope
+      yield* Effect.promise(() => Instance.disposeAll())
+      yield* Deferred.await(disposed).pipe(Effect.timeout("2 seconds"))
+
+      expect(types).toContain("test.effect.ping")
+      expect(types).toContain(Bus.InstanceDisposed.type)
+    }),
+  )
+})

+ 87 - 0
packages/opencode/test/bus/bus-integration.test.ts

@@ -0,0 +1,87 @@
+import { afterEach, describe, expect, test } from "bun:test"
+import z from "zod"
+import { Bus } from "../../src/bus"
+import { BusEvent } from "../../src/bus/bus-event"
+import { Instance } from "../../src/project/instance"
+import { tmpdir } from "../fixture/fixture"
+
+const TestEvent = BusEvent.define("test.integration", z.object({ value: z.number() }))
+
+function withInstance(directory: string, fn: () => Promise<void>) {
+  return Instance.provide({ directory, fn })
+}
+
+describe("Bus integration: acquireRelease subscriber pattern", () => {
+  afterEach(() => Instance.disposeAll())
+
+  test("subscriber via callback facade receives events and cleans up on unsub", async () => {
+    await using tmp = await tmpdir()
+    const received: number[] = []
+
+    await withInstance(tmp.path, async () => {
+      const unsub = Bus.subscribe(TestEvent, (evt) => {
+        received.push(evt.properties.value)
+      })
+      await Bun.sleep(10)
+      await Bus.publish(TestEvent, { value: 1 })
+      await Bus.publish(TestEvent, { value: 2 })
+      await Bun.sleep(10)
+
+      expect(received).toEqual([1, 2])
+
+      unsub()
+      await Bun.sleep(10)
+      await Bus.publish(TestEvent, { value: 3 })
+      await Bun.sleep(10)
+
+      expect(received).toEqual([1, 2])
+    })
+  })
+
+  test("subscribeAll receives events from multiple types", async () => {
+    await using tmp = await tmpdir()
+    const received: Array<{ type: string; value?: number }> = []
+
+    const OtherEvent = BusEvent.define("test.other", z.object({ value: z.number() }))
+
+    await withInstance(tmp.path, async () => {
+      Bus.subscribeAll((evt) => {
+        received.push({ type: evt.type, value: evt.properties.value })
+      })
+      await Bun.sleep(10)
+      await Bus.publish(TestEvent, { value: 10 })
+      await Bus.publish(OtherEvent, { value: 20 })
+      await Bun.sleep(10)
+    })
+
+    expect(received).toEqual([
+      { type: "test.integration", value: 10 },
+      { type: "test.other", value: 20 },
+    ])
+  })
+
+  test("subscriber cleanup on instance disposal interrupts the stream", async () => {
+    await using tmp = await tmpdir()
+    const received: number[] = []
+    let disposed = false
+
+    await withInstance(tmp.path, async () => {
+      Bus.subscribeAll((evt) => {
+        if (evt.type === Bus.InstanceDisposed.type) {
+          disposed = true
+          return
+        }
+        received.push(evt.properties.value)
+      })
+      await Bun.sleep(10)
+      await Bus.publish(TestEvent, { value: 1 })
+      await Bun.sleep(10)
+    })
+
+    await Instance.disposeAll()
+    await Bun.sleep(50)
+
+    expect(received).toEqual([1])
+    expect(disposed).toBe(true)
+  })
+})

+ 219 - 0
packages/opencode/test/bus/bus.test.ts

@@ -0,0 +1,219 @@
+import { afterEach, describe, expect, test } from "bun:test"
+import z from "zod"
+import { Bus } from "../../src/bus"
+import { BusEvent } from "../../src/bus/bus-event"
+import { Instance } from "../../src/project/instance"
+import { tmpdir } from "../fixture/fixture"
+
+const TestEvent = {
+  Ping: BusEvent.define("test.ping", z.object({ value: z.number() })),
+  Pong: BusEvent.define("test.pong", z.object({ message: z.string() })),
+}
+
+function withInstance(directory: string, fn: () => Promise<void>) {
+  return Instance.provide({ directory, fn })
+}
+
+describe("Bus", () => {
+  afterEach(() => Instance.disposeAll())
+
+  describe("publish + subscribe", () => {
+    test("subscriber is live immediately after subscribe returns", async () => {
+      await using tmp = await tmpdir()
+      const received: number[] = []
+
+      await withInstance(tmp.path, async () => {
+        Bus.subscribe(TestEvent.Ping, (evt) => {
+          received.push(evt.properties.value)
+        })
+        await Bus.publish(TestEvent.Ping, { value: 42 })
+        await Bun.sleep(10)
+      })
+
+      expect(received).toEqual([42])
+    })
+
+    test("subscriber receives matching events", async () => {
+      await using tmp = await tmpdir()
+      const received: number[] = []
+
+      await withInstance(tmp.path, async () => {
+        Bus.subscribe(TestEvent.Ping, (evt) => {
+          received.push(evt.properties.value)
+        })
+        // Give the subscriber fiber time to start consuming
+        await Bun.sleep(10)
+        await Bus.publish(TestEvent.Ping, { value: 42 })
+        await Bus.publish(TestEvent.Ping, { value: 99 })
+        // Give subscriber time to process
+        await Bun.sleep(10)
+      })
+
+      expect(received).toEqual([42, 99])
+    })
+
+    test("subscriber does not receive events of other types", async () => {
+      await using tmp = await tmpdir()
+      const pings: number[] = []
+
+      await withInstance(tmp.path, async () => {
+        Bus.subscribe(TestEvent.Ping, (evt) => {
+          pings.push(evt.properties.value)
+        })
+        await Bun.sleep(10)
+        await Bus.publish(TestEvent.Pong, { message: "hello" })
+        await Bus.publish(TestEvent.Ping, { value: 1 })
+        await Bun.sleep(10)
+      })
+
+      expect(pings).toEqual([1])
+    })
+
+    test("publish with no subscribers does not throw", async () => {
+      await using tmp = await tmpdir()
+
+      await withInstance(tmp.path, async () => {
+        await Bus.publish(TestEvent.Ping, { value: 1 })
+      })
+    })
+  })
+
+  describe("unsubscribe", () => {
+    test("unsubscribe stops delivery", async () => {
+      await using tmp = await tmpdir()
+      const received: number[] = []
+
+      await withInstance(tmp.path, async () => {
+        const unsub = Bus.subscribe(TestEvent.Ping, (evt) => {
+          received.push(evt.properties.value)
+        })
+        await Bun.sleep(10)
+        await Bus.publish(TestEvent.Ping, { value: 1 })
+        await Bun.sleep(10)
+        unsub()
+        await Bun.sleep(10)
+        await Bus.publish(TestEvent.Ping, { value: 2 })
+        await Bun.sleep(10)
+      })
+
+      expect(received).toEqual([1])
+    })
+  })
+
+  describe("subscribeAll", () => {
+    test("subscribeAll is live immediately after subscribe returns", async () => {
+      await using tmp = await tmpdir()
+      const received: string[] = []
+
+      await withInstance(tmp.path, async () => {
+        Bus.subscribeAll((evt) => {
+          received.push(evt.type)
+        })
+        await Bus.publish(TestEvent.Ping, { value: 1 })
+        await Bun.sleep(10)
+      })
+
+      expect(received).toEqual(["test.ping"])
+    })
+
+    test("receives all event types", async () => {
+      await using tmp = await tmpdir()
+      const received: string[] = []
+
+      await withInstance(tmp.path, async () => {
+        Bus.subscribeAll((evt) => {
+          received.push(evt.type)
+        })
+        await Bun.sleep(10)
+        await Bus.publish(TestEvent.Ping, { value: 1 })
+        await Bus.publish(TestEvent.Pong, { message: "hi" })
+        await Bun.sleep(10)
+      })
+
+      expect(received).toContain("test.ping")
+      expect(received).toContain("test.pong")
+    })
+  })
+
+  describe("multiple subscribers", () => {
+    test("all subscribers for same event type are called", async () => {
+      await using tmp = await tmpdir()
+      const a: number[] = []
+      const b: number[] = []
+
+      await withInstance(tmp.path, async () => {
+        Bus.subscribe(TestEvent.Ping, (evt) => {
+          a.push(evt.properties.value)
+        })
+        Bus.subscribe(TestEvent.Ping, (evt) => {
+          b.push(evt.properties.value)
+        })
+        await Bun.sleep(10)
+        await Bus.publish(TestEvent.Ping, { value: 7 })
+        await Bun.sleep(10)
+      })
+
+      expect(a).toEqual([7])
+      expect(b).toEqual([7])
+    })
+  })
+
+  describe("instance isolation", () => {
+    test("events in one directory do not reach subscribers in another", async () => {
+      await using tmpA = await tmpdir()
+      await using tmpB = await tmpdir()
+      const receivedA: number[] = []
+      const receivedB: number[] = []
+
+      await withInstance(tmpA.path, async () => {
+        Bus.subscribe(TestEvent.Ping, (evt) => {
+          receivedA.push(evt.properties.value)
+        })
+        await Bun.sleep(10)
+      })
+
+      await withInstance(tmpB.path, async () => {
+        Bus.subscribe(TestEvent.Ping, (evt) => {
+          receivedB.push(evt.properties.value)
+        })
+        await Bun.sleep(10)
+      })
+
+      await withInstance(tmpA.path, async () => {
+        await Bus.publish(TestEvent.Ping, { value: 1 })
+        await Bun.sleep(10)
+      })
+
+      await withInstance(tmpB.path, async () => {
+        await Bus.publish(TestEvent.Ping, { value: 2 })
+        await Bun.sleep(10)
+      })
+
+      expect(receivedA).toEqual([1])
+      expect(receivedB).toEqual([2])
+    })
+  })
+
+  describe("instance disposal", () => {
+    test("InstanceDisposed is delivered to wildcard subscribers before stream ends", async () => {
+      await using tmp = await tmpdir()
+      const received: string[] = []
+
+      await withInstance(tmp.path, async () => {
+        Bus.subscribeAll((evt) => {
+          received.push(evt.type)
+        })
+        await Bun.sleep(10)
+        await Bus.publish(TestEvent.Ping, { value: 1 })
+        await Bun.sleep(10)
+      })
+
+      // Instance.disposeAll triggers the finalizer which publishes InstanceDisposed
+      await Instance.disposeAll()
+      await Bun.sleep(50)
+
+      expect(received).toContain("test.ping")
+      expect(received).toContain(Bus.InstanceDisposed.type)
+    })
+  })
+})

+ 4 - 4
packages/opencode/test/effect/run-service.test.ts

@@ -1,10 +1,10 @@
 import { expect, test } from "bun:test"
 import { Effect, Layer, ServiceMap } from "effect"
-import { makeRunPromise } from "../../src/effect/run-service"
+import { makeRuntime } from "../../src/effect/run-service"
 
 class Shared extends ServiceMap.Service<Shared, { readonly id: number }>()("@test/Shared") {}
 
-test("makeRunPromise shares dependent layers through the shared memo map", async () => {
+test("makeRuntime shares dependent layers through the shared memo map", async () => {
   let n = 0
 
   const shared = Layer.effect(
@@ -37,8 +37,8 @@ test("makeRunPromise shares dependent layers through the shared memo map", async
     }),
   ).pipe(Layer.provide(shared))
 
-  const runOne = makeRunPromise(One, one)
-  const runTwo = makeRunPromise(Two, two)
+  const { runPromise: runOne } = makeRuntime(One, one)
+  const { runPromise: runTwo } = makeRuntime(Two, two)
 
   expect(await runOne((svc) => svc.get())).toBe(1)
   expect(await runTwo((svc) => svc.get())).toBe(1)

+ 22 - 10
packages/opencode/test/file/watcher.test.ts

@@ -2,9 +2,8 @@ import { $ } from "bun"
 import { afterEach, describe, expect, test } from "bun:test"
 import fs from "fs/promises"
 import path from "path"
-import { Deferred, Effect, Option } from "effect"
+import { ConfigProvider, Deferred, Effect, Layer, ManagedRuntime, Option } from "effect"
 import { tmpdir } from "../fixture/fixture"
-import { watcherConfigLayer, withServices } from "../fixture/instance"
 import { Bus } from "../../src/bus"
 import { FileWatcher } from "../../src/file/watcher"
 import { Instance } from "../../src/project/instance"
@@ -16,20 +15,33 @@ const describeWatcher = FileWatcher.hasNativeBinding() && !process.env.CI ? desc
 // Helpers
 // ---------------------------------------------------------------------------
 
+const watcherConfigLayer = ConfigProvider.layer(
+  ConfigProvider.fromUnknown({
+    OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
+    OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
+  }),
+)
+
 type WatcherEvent = { file: string; event: "add" | "change" | "unlink" }
 
 /** Run `body` with a live FileWatcher service. */
 function withWatcher<E>(directory: string, body: Effect.Effect<void, E>) {
-  return withServices(
+  return Instance.provide({
     directory,
-    FileWatcher.layer,
-    async (rt) => {
-      await rt.runPromise(FileWatcher.Service.use((s) => s.init()))
-      await Effect.runPromise(ready(directory))
-      await Effect.runPromise(body)
+    fn: async () => {
+      const layer: Layer.Layer<FileWatcher.Service, never, never> = FileWatcher.layer.pipe(
+        Layer.provide(watcherConfigLayer),
+      )
+      const rt = ManagedRuntime.make(layer)
+      try {
+        await rt.runPromise(FileWatcher.Service.use((s) => s.init()))
+        await Effect.runPromise(ready(directory))
+        await Effect.runPromise(body)
+      } finally {
+        await rt.dispose()
+      }
     },
-    { provide: [watcherConfigLayer] },
-  )
+  })
 }
 
 function listen(directory: string, check: (evt: WatcherEvent) => boolean, hit: (evt: WatcherEvent) => void) {

+ 68 - 0
packages/opencode/test/fixture/fixture.ts

@@ -2,7 +2,10 @@ import { $ } from "bun"
 import * as fs from "fs/promises"
 import os from "os"
 import path from "path"
+import { Effect, FileSystem, ServiceMap } from "effect"
+import { ChildProcess, ChildProcessSpawner } from "effect/unstable/process"
 import type { Config } from "../../src/config/config"
+import { Instance } from "../../src/project/instance"
 
 // Strip null bytes from paths (defensive fix for CI environment issues)
 function sanitizePath(p: string): string {
@@ -71,3 +74,68 @@ export async function tmpdir<T>(options?: TmpDirOptions<T>) {
   }
   return result
 }
+
+/** Effectful scoped tmpdir. Cleaned up when the scope closes. Make sure these stay in sync */
+export function tmpdirScoped(options?: { git?: boolean; config?: Partial<Config.Info> }) {
+  return Effect.gen(function* () {
+    const fs = yield* FileSystem.FileSystem
+    const spawner = yield* ChildProcessSpawner.ChildProcessSpawner
+    const dir = yield* fs.makeTempDirectoryScoped({ prefix: "opencode-test-" })
+
+    const git = (...args: string[]) =>
+      spawner.spawn(ChildProcess.make("git", args, { cwd: dir })).pipe(Effect.flatMap((handle) => handle.exitCode))
+
+    if (options?.git) {
+      yield* git("init")
+      yield* git("config", "core.fsmonitor", "false")
+      yield* git("config", "user.email", "[email protected]")
+      yield* git("config", "user.name", "Test")
+      yield* git("commit", "--allow-empty", "-m", "root commit")
+    }
+
+    if (options?.config) {
+      yield* fs.writeFileString(
+        path.join(dir, "opencode.json"),
+        JSON.stringify({ $schema: "https://opencode.ai/config.json", ...options.config }),
+      )
+    }
+
+    return dir
+  })
+}
+
+export const provideInstance =
+  (directory: string) =>
+  <A, E, R>(self: Effect.Effect<A, E, R>): Effect.Effect<A, E, R> =>
+    Effect.servicesWith((services: ServiceMap.ServiceMap<R>) =>
+      Effect.promise<A>(async () =>
+        Instance.provide({
+          directory,
+          fn: () => Effect.runPromiseWith(services)(self),
+        }),
+      ),
+    )
+
+export function provideTmpdirInstance<A, E, R>(
+  self: (path: string) => Effect.Effect<A, E, R>,
+  options?: { git?: boolean; config?: Partial<Config.Info> },
+) {
+  return Effect.gen(function* () {
+    const path = yield* tmpdirScoped(options)
+    let provided = false
+
+    yield* Effect.addFinalizer(() =>
+      provided
+        ? Effect.promise(() =>
+            Instance.provide({
+              directory: path,
+              fn: () => Instance.dispose(),
+            }),
+          ).pipe(Effect.ignore)
+        : Effect.void,
+    )
+
+    provided = true
+    return yield* self(path).pipe(provideInstance(path))
+  })
+}

+ 0 - 51
packages/opencode/test/fixture/instance.ts

@@ -1,51 +0,0 @@
-import { ConfigProvider, Layer, ManagedRuntime } from "effect"
-import { InstanceContext } from "../../src/effect/instance-context"
-import { Instance } from "../../src/project/instance"
-
-/** ConfigProvider that enables the experimental file watcher. */
-export const watcherConfigLayer = ConfigProvider.layer(
-  ConfigProvider.fromUnknown({
-    OPENCODE_EXPERIMENTAL_FILEWATCHER: "true",
-    OPENCODE_EXPERIMENTAL_DISABLE_FILEWATCHER: "false",
-  }),
-)
-
-/**
- * Boot an Instance with the given service layers and run `body` with
- * the ManagedRuntime. Cleanup is automatic — the runtime is disposed
- * and Instance context is torn down when `body` completes.
- *
- * Layers may depend on InstanceContext (provided automatically).
- * Pass extra layers via `options.provide` (e.g. ConfigProvider.layer).
- */
-export function withServices<S>(
-  directory: string,
-  layer: Layer.Layer<S, any, InstanceContext>,
-  body: (rt: ManagedRuntime.ManagedRuntime<S, never>) => Promise<void>,
-  options?: { provide?: Layer.Layer<never>[] },
-) {
-  return Instance.provide({
-    directory,
-    fn: async () => {
-      const ctx = Layer.sync(InstanceContext, () =>
-        InstanceContext.of({
-          directory: Instance.directory,
-          worktree: Instance.worktree,
-          project: Instance.project,
-        }),
-      )
-      let resolved: Layer.Layer<S> = layer.pipe(Layer.provide(ctx)) as any
-      if (options?.provide) {
-        for (const l of options.provide) {
-          resolved = resolved.pipe(Layer.provide(l)) as any
-        }
-      }
-      const rt = ManagedRuntime.make(resolved)
-      try {
-        await body(rt)
-      } finally {
-        await rt.dispose()
-      }
-    },
-  })
-}

+ 171 - 161
packages/opencode/test/format/format.test.ts

@@ -1,172 +1,182 @@
-import { Effect } from "effect"
-import { afterEach, describe, expect, test } from "bun:test"
-import { tmpdir } from "../fixture/fixture"
-import { withServices } from "../fixture/instance"
-import { Bus } from "../../src/bus"
-import { File } from "../../src/file"
+import { NodeChildProcessSpawner, NodeFileSystem, NodePath } from "@effect/platform-node"
+import { describe, expect } from "bun:test"
+import { Effect, Layer } from "effect"
+import { provideTmpdirInstance } from "../fixture/fixture"
+import { testEffect } from "../lib/effect"
 import { Format } from "../../src/format"
 import * as Formatter from "../../src/format/formatter"
-import { Instance } from "../../src/project/instance"
+
+const node = NodeChildProcessSpawner.layer.pipe(
+  Layer.provideMerge(Layer.mergeAll(NodeFileSystem.layer, NodePath.layer)),
+)
+
+const it = testEffect(Layer.mergeAll(Format.layer, node))
 
 describe("Format", () => {
-  afterEach(async () => {
-    await Instance.disposeAll()
-  })
-
-  test("status() returns built-in formatters when no config overrides", async () => {
-    await using tmp = await tmpdir()
-
-    await withServices(tmp.path, Format.layer, async (rt) => {
-      const statuses = await rt.runPromise(Format.Service.use((s) => s.status()))
-      expect(Array.isArray(statuses)).toBe(true)
-      expect(statuses.length).toBeGreaterThan(0)
-
-      for (const s of statuses) {
-        expect(typeof s.name).toBe("string")
-        expect(Array.isArray(s.extensions)).toBe(true)
-        expect(typeof s.enabled).toBe("boolean")
-      }
-
-      const gofmt = statuses.find((s) => s.name === "gofmt")
-      expect(gofmt).toBeDefined()
-      expect(gofmt!.extensions).toContain(".go")
-    })
-  })
-
-  test("status() returns empty list when formatter is disabled", async () => {
-    await using tmp = await tmpdir({
-      config: { formatter: false },
-    })
-
-    await withServices(tmp.path, Format.layer, async (rt) => {
-      const statuses = await rt.runPromise(Format.Service.use((s) => s.status()))
-      expect(statuses).toEqual([])
-    })
-  })
-
-  test("status() excludes formatters marked as disabled in config", async () => {
-    await using tmp = await tmpdir({
-      config: {
-        formatter: {
-          gofmt: { disabled: true },
+  it.effect("status() returns built-in formatters when no config overrides", () =>
+    provideTmpdirInstance(() =>
+      Format.Service.use((fmt) =>
+        Effect.gen(function* () {
+          const statuses = yield* fmt.status()
+          expect(Array.isArray(statuses)).toBe(true)
+          expect(statuses.length).toBeGreaterThan(0)
+
+          for (const item of statuses) {
+            expect(typeof item.name).toBe("string")
+            expect(Array.isArray(item.extensions)).toBe(true)
+            expect(typeof item.enabled).toBe("boolean")
+          }
+
+          const gofmt = statuses.find((item) => item.name === "gofmt")
+          expect(gofmt).toBeDefined()
+          expect(gofmt!.extensions).toContain(".go")
+        }),
+      ),
+    ),
+  )
+
+  it.effect("status() returns empty list when formatter is disabled", () =>
+    provideTmpdirInstance(
+      () =>
+        Format.Service.use((fmt) =>
+          Effect.gen(function* () {
+            expect(yield* fmt.status()).toEqual([])
+          }),
+        ),
+      { config: { formatter: false } },
+    ),
+  )
+
+  it.effect("status() excludes formatters marked as disabled in config", () =>
+    provideTmpdirInstance(
+      () =>
+        Format.Service.use((fmt) =>
+          Effect.gen(function* () {
+            const statuses = yield* fmt.status()
+            const gofmt = statuses.find((item) => item.name === "gofmt")
+            expect(gofmt).toBeUndefined()
+          }),
+        ),
+      {
+        config: {
+          formatter: {
+            gofmt: { disabled: true },
+          },
         },
       },
-    })
-
-    await withServices(tmp.path, Format.layer, async (rt) => {
-      const statuses = await rt.runPromise(Format.Service.use((s) => s.status()))
-      const gofmt = statuses.find((s) => s.name === "gofmt")
-      expect(gofmt).toBeUndefined()
-    })
-  })
-
-  test("service initializes without error", async () => {
-    await using tmp = await tmpdir()
-
-    await withServices(tmp.path, Format.layer, async (rt) => {
-      await rt.runPromise(Format.Service.use(() => Effect.void))
-    })
-  })
-
-  test("status() initializes formatter state per directory", async () => {
-    await using off = await tmpdir({
-      config: { formatter: false },
-    })
-    await using on = await tmpdir()
-
-    const a = await Instance.provide({
-      directory: off.path,
-      fn: () => Format.status(),
-    })
-    const b = await Instance.provide({
-      directory: on.path,
-      fn: () => Format.status(),
-    })
-
-    expect(a).toEqual([])
-    expect(b.length).toBeGreaterThan(0)
-  })
-
-  test("runs enabled checks for matching formatters in parallel", async () => {
-    await using tmp = await tmpdir()
-
-    const file = `${tmp.path}/test.parallel`
-    await Bun.write(file, "x")
-
-    const one = {
-      extensions: Formatter.gofmt.extensions,
-      enabled: Formatter.gofmt.enabled,
-      command: Formatter.gofmt.command,
-    }
-    const two = {
-      extensions: Formatter.mix.extensions,
-      enabled: Formatter.mix.enabled,
-      command: Formatter.mix.command,
-    }
-
-    let active = 0
-    let max = 0
-
-    Formatter.gofmt.extensions = [".parallel"]
-    Formatter.mix.extensions = [".parallel"]
-    Formatter.gofmt.command = ["sh", "-c", "true"]
-    Formatter.mix.command = ["sh", "-c", "true"]
-    Formatter.gofmt.enabled = async () => {
-      active++
-      max = Math.max(max, active)
-      await Bun.sleep(20)
-      active--
-      return true
-    }
-    Formatter.mix.enabled = async () => {
-      active++
-      max = Math.max(max, active)
-      await Bun.sleep(20)
-      active--
-      return true
-    }
-
-    try {
-      await withServices(tmp.path, Format.layer, async (rt) => {
-        await rt.runPromise(Format.Service.use((s) => s.init()))
-        await Bus.publish(File.Event.Edited, { file })
+    ),
+  )
+
+  it.effect("service initializes without error", () =>
+    provideTmpdirInstance(() => Format.Service.use(() => Effect.void)),
+  )
+
+  it.effect("status() initializes formatter state per directory", () =>
+    Effect.gen(function* () {
+      const a = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status()), {
+        config: { formatter: false },
       })
-    } finally {
-      Formatter.gofmt.extensions = one.extensions
-      Formatter.gofmt.enabled = one.enabled
-      Formatter.gofmt.command = one.command
-      Formatter.mix.extensions = two.extensions
-      Formatter.mix.enabled = two.enabled
-      Formatter.mix.command = two.command
-    }
-
-    expect(max).toBe(2)
-  })
-
-  test("runs matching formatters sequentially for the same file", async () => {
-    await using tmp = await tmpdir({
-      config: {
-        formatter: {
-          first: {
-            command: ["sh", "-c", 'sleep 0.05; v=$(cat "$1"); printf \'%sA\' "$v" > "$1"', "sh", "$FILE"],
-            extensions: [".seq"],
-          },
-          second: {
-            command: ["sh", "-c", 'v=$(cat "$1"); printf \'%sB\' "$v" > "$1"', "sh", "$FILE"],
-            extensions: [".seq"],
+      const b = yield* provideTmpdirInstance(() => Format.Service.use((fmt) => fmt.status()))
+
+      expect(a).toEqual([])
+      expect(b.length).toBeGreaterThan(0)
+    }),
+  )
+
+  it.effect("runs enabled checks for matching formatters in parallel", () =>
+    provideTmpdirInstance((path) =>
+      Effect.gen(function* () {
+        const file = `${path}/test.parallel`
+        yield* Effect.promise(() => Bun.write(file, "x"))
+
+        const one = {
+          extensions: Formatter.gofmt.extensions,
+          enabled: Formatter.gofmt.enabled,
+          command: Formatter.gofmt.command,
+        }
+        const two = {
+          extensions: Formatter.mix.extensions,
+          enabled: Formatter.mix.enabled,
+          command: Formatter.mix.command,
+        }
+
+        let active = 0
+        let max = 0
+
+        yield* Effect.acquireUseRelease(
+          Effect.sync(() => {
+            Formatter.gofmt.extensions = [".parallel"]
+            Formatter.mix.extensions = [".parallel"]
+            Formatter.gofmt.command = ["sh", "-c", "true"]
+            Formatter.mix.command = ["sh", "-c", "true"]
+            Formatter.gofmt.enabled = async () => {
+              active++
+              max = Math.max(max, active)
+              await Bun.sleep(20)
+              active--
+              return true
+            }
+            Formatter.mix.enabled = async () => {
+              active++
+              max = Math.max(max, active)
+              await Bun.sleep(20)
+              active--
+              return true
+            }
+          }),
+          () =>
+            Format.Service.use((fmt) =>
+              Effect.gen(function* () {
+                yield* fmt.init()
+                yield* fmt.file(file)
+              }),
+            ),
+          () =>
+            Effect.sync(() => {
+              Formatter.gofmt.extensions = one.extensions
+              Formatter.gofmt.enabled = one.enabled
+              Formatter.gofmt.command = one.command
+              Formatter.mix.extensions = two.extensions
+              Formatter.mix.enabled = two.enabled
+              Formatter.mix.command = two.command
+            }),
+        )
+
+        expect(max).toBe(2)
+      }),
+    ),
+  )
+
+  it.effect("runs matching formatters sequentially for the same file", () =>
+    provideTmpdirInstance(
+      (path) =>
+        Effect.gen(function* () {
+          const file = `${path}/test.seq`
+          yield* Effect.promise(() => Bun.write(file, "x"))
+
+          yield* Format.Service.use((fmt) =>
+            Effect.gen(function* () {
+              yield* fmt.init()
+              yield* fmt.file(file)
+            }),
+          )
+
+          expect(yield* Effect.promise(() => Bun.file(file).text())).toBe("xAB")
+        }),
+      {
+        config: {
+          formatter: {
+            first: {
+              command: ["sh", "-c", 'sleep 0.05; v=$(cat "$1"); printf \'%sA\' "$v" > "$1"', "sh", "$FILE"],
+              extensions: [".seq"],
+            },
+            second: {
+              command: ["sh", "-c", 'v=$(cat "$1"); printf \'%sB\' "$v" > "$1"', "sh", "$FILE"],
+              extensions: [".seq"],
+            },
           },
         },
       },
-    })
-
-    const file = `${tmp.path}/test.seq`
-    await Bun.write(file, "x")
-
-    await withServices(tmp.path, Format.layer, async (rt) => {
-      await rt.runPromise(Format.Service.use((s) => s.init()))
-      await Bus.publish(File.Event.Edited, { file })
-    })
-
-    expect(await Bun.file(file).text()).toBe("xAB")
-  })
+    ),
+  )
 })

+ 34 - 38
packages/opencode/test/project/vcs.test.ts

@@ -2,9 +2,7 @@ import { $ } from "bun"
 import { afterEach, describe, expect, test } from "bun:test"
 import fs from "fs/promises"
 import path from "path"
-import { Effect, Layer, ManagedRuntime } from "effect"
 import { tmpdir } from "../fixture/fixture"
-import { watcherConfigLayer, withServices } from "../fixture/instance"
 import { FileWatcher } from "../../src/file/watcher"
 import { Instance } from "../../src/project/instance"
 import { GlobalBus } from "../../src/bus/global"
@@ -17,28 +15,26 @@ const describeVcs = FileWatcher.hasNativeBinding() && !process.env.CI ? describe
 // Helpers
 // ---------------------------------------------------------------------------
 
-function withVcs(
-  directory: string,
-  body: (rt: ManagedRuntime.ManagedRuntime<FileWatcher.Service | Vcs.Service, never>) => Promise<void>,
-) {
-  return withServices(
+async function withVcs(directory: string, body: () => Promise<void>) {
+  return Instance.provide({
     directory,
-    Layer.merge(FileWatcher.layer, Vcs.defaultLayer),
-    async (rt) => {
-      await rt.runPromise(FileWatcher.Service.use((s) => s.init()))
-      await rt.runPromise(Vcs.Service.use((s) => s.init()))
+    fn: async () => {
+      FileWatcher.init()
+      Vcs.init()
       await Bun.sleep(500)
-      await body(rt)
+      await body()
     },
-    { provide: [watcherConfigLayer] },
-  )
+  })
 }
 
-function withVcsOnly(
-  directory: string,
-  body: (rt: ManagedRuntime.ManagedRuntime<Vcs.Service, never>) => Promise<void>,
-) {
-  return withServices(directory, Vcs.defaultLayer, body)
+function withVcsOnly(directory: string, body: () => Promise<void>) {
+  return Instance.provide({
+    directory,
+    fn: async () => {
+      Vcs.init()
+      await body()
+    },
+  })
 }
 
 type BranchEvent = { directory?: string; payload: { type: string; properties: { branch?: string } } }
@@ -82,8 +78,8 @@ describeVcs("Vcs", () => {
   test("branch() returns current branch name", async () => {
     await using tmp = await tmpdir({ git: true })
 
-    await withVcs(tmp.path, async (rt) => {
-      const branch = await rt.runPromise(Vcs.Service.use((s) => s.branch()))
+    await withVcs(tmp.path, async () => {
+      const branch = await Vcs.branch()
       expect(branch).toBeDefined()
       expect(typeof branch).toBe("string")
     })
@@ -92,8 +88,8 @@ describeVcs("Vcs", () => {
   test("branch() returns undefined for non-git directories", async () => {
     await using tmp = await tmpdir()
 
-    await withVcs(tmp.path, async (rt) => {
-      const branch = await rt.runPromise(Vcs.Service.use((s) => s.branch()))
+    await withVcs(tmp.path, async () => {
+      const branch = await Vcs.branch()
       expect(branch).toBeUndefined()
     })
   })
@@ -119,14 +115,14 @@ describeVcs("Vcs", () => {
     const branch = `test-${Math.random().toString(36).slice(2)}`
     await $`git branch ${branch}`.cwd(tmp.path).quiet()
 
-    await withVcs(tmp.path, async (rt) => {
+    await withVcs(tmp.path, async () => {
       const pending = nextBranchUpdate(tmp.path)
 
       const head = path.join(tmp.path, ".git", "HEAD")
       await fs.writeFile(head, `ref: refs/heads/${branch}\n`)
 
       await pending
-      const current = await rt.runPromise(Vcs.Service.use((s) => s.branch()))
+      const current = await Vcs.branch()
       expect(current).toBe(branch)
     })
   })
@@ -141,8 +137,8 @@ describe("Vcs diff", () => {
     await using tmp = await tmpdir({ git: true })
     await $`git branch -M main`.cwd(tmp.path).quiet()
 
-    await withVcsOnly(tmp.path, async (rt) => {
-      const branch = await rt.runPromise(Vcs.Service.use((s) => s.defaultBranch()))
+    await withVcsOnly(tmp.path, async () => {
+      const branch = await Vcs.defaultBranch()
       expect(branch).toBe("main")
     })
   })
@@ -152,8 +148,8 @@ describe("Vcs diff", () => {
     await $`git branch -M trunk`.cwd(tmp.path).quiet()
     await $`git config init.defaultBranch trunk`.cwd(tmp.path).quiet()
 
-    await withVcsOnly(tmp.path, async (rt) => {
-      const branch = await rt.runPromise(Vcs.Service.use((s) => s.defaultBranch()))
+    await withVcsOnly(tmp.path, async () => {
+      const branch = await Vcs.defaultBranch()
       expect(branch).toBe("trunk")
     })
   })
@@ -165,10 +161,10 @@ describe("Vcs diff", () => {
     const dir = path.join(wt.path, "feature")
     await $`git worktree add -b feature/test ${dir} HEAD`.cwd(tmp.path).quiet()
 
-    await withVcsOnly(dir, async (rt) => {
+    await withVcsOnly(dir, async () => {
       const [branch, base] = await Promise.all([
-        rt.runPromise(Vcs.Service.use((s) => s.branch())),
-        rt.runPromise(Vcs.Service.use((s) => s.defaultBranch())),
+        Vcs.branch(),
+        Vcs.defaultBranch(),
       ])
       expect(branch).toBe("feature/test")
       expect(base).toBe("main")
@@ -182,8 +178,8 @@ describe("Vcs diff", () => {
     await $`git commit --no-gpg-sign -m "add file"`.cwd(tmp.path).quiet()
     await fs.writeFile(path.join(tmp.path, "file.txt"), "changed\n", "utf-8")
 
-    await withVcsOnly(tmp.path, async (rt) => {
-      const diff = await rt.runPromise(Vcs.Service.use((s) => s.diff("git")))
+    await withVcsOnly(tmp.path, async () => {
+      const diff = await Vcs.diff("git")
       expect(diff).toEqual(
         expect.arrayContaining([
           expect.objectContaining({
@@ -199,8 +195,8 @@ describe("Vcs diff", () => {
     await using tmp = await tmpdir({ git: true })
     await fs.writeFile(path.join(tmp.path, weird), "hello\n", "utf-8")
 
-    await withVcsOnly(tmp.path, async (rt) => {
-      const diff = await rt.runPromise(Vcs.Service.use((s) => s.diff("git")))
+    await withVcsOnly(tmp.path, async () => {
+      const diff = await Vcs.diff("git")
       expect(diff).toEqual(
         expect.arrayContaining([
           expect.objectContaining({
@@ -220,8 +216,8 @@ describe("Vcs diff", () => {
     await $`git add .`.cwd(tmp.path).quiet()
     await $`git commit --no-gpg-sign -m "branch file"`.cwd(tmp.path).quiet()
 
-    await withVcsOnly(tmp.path, async (rt) => {
-      const diff = await rt.runPromise(Vcs.Service.use((s) => s.diff("branch")))
+    await withVcsOnly(tmp.path, async () => {
+      const diff = await Vcs.diff("branch")
       expect(diff).toEqual(
         expect.arrayContaining([
           expect.objectContaining({

+ 7 - 3
packages/opencode/test/sync/index.test.ts

@@ -110,10 +110,16 @@ describe("SyncEvent", () => {
           type: string
           properties: { id: string; name: string }
         }> = []
-        const unsub = Bus.subscribeAll((event) => events.push(event))
+        const received = new Promise<void>((resolve) => {
+          Bus.subscribeAll((event) => {
+            events.push(event)
+            resolve()
+          })
+        })
 
         SyncEvent.run(Created, { id: "evt_1", name: "test" })
 
+        await received
         expect(events).toHaveLength(1)
         expect(events[0]).toEqual({
           type: "item.created",
@@ -122,8 +128,6 @@ describe("SyncEvent", () => {
             name: "test",
           },
         })
-
-        unsub()
       }),
     )
   })

+ 0 - 7
packages/opencode/test/tool/edit.test.ts

@@ -89,7 +89,6 @@ describe("tool.edit", () => {
           const { FileWatcher } = await import("../../src/file/watcher")
 
           const events: string[] = []
-          const unsubEdited = Bus.subscribe(File.Event.Edited, () => events.push("edited"))
           const unsubUpdated = Bus.subscribe(FileWatcher.Event.Updated, () => events.push("updated"))
 
           const edit = await EditTool.init()
@@ -102,9 +101,7 @@ describe("tool.edit", () => {
             ctx,
           )
 
-          expect(events).toContain("edited")
           expect(events).toContain("updated")
-          unsubEdited()
           unsubUpdated()
         },
       })
@@ -305,11 +302,9 @@ describe("tool.edit", () => {
           await FileTime.read(ctx.sessionID, filepath)
 
           const { Bus } = await import("../../src/bus")
-          const { File } = await import("../../src/file")
           const { FileWatcher } = await import("../../src/file/watcher")
 
           const events: string[] = []
-          const unsubEdited = Bus.subscribe(File.Event.Edited, () => events.push("edited"))
           const unsubUpdated = Bus.subscribe(FileWatcher.Event.Updated, () => events.push("updated"))
 
           const edit = await EditTool.init()
@@ -322,9 +317,7 @@ describe("tool.edit", () => {
             ctx,
           )
 
-          expect(events).toContain("edited")
           expect(events).toContain("updated")
-          unsubEdited()
           unsubUpdated()
         },
       })