|
|
@@ -1 +1,193 @@
|
|
|
-export * as Bus from "./bus"
|
|
|
+import z from "zod"
|
|
|
+import { Effect, Exit, Layer, PubSub, Scope, Context, Stream } from "effect"
|
|
|
+import { EffectBridge } from "@/effect"
|
|
|
+import { Log } from "../util"
|
|
|
+import { BusEvent } from "./bus-event"
|
|
|
+import { GlobalBus } from "./global"
|
|
|
+import { InstanceState } from "@/effect"
|
|
|
+import { makeRuntime } from "@/effect/run-service"
|
|
|
+
|
|
|
+const log = Log.create({ service: "bus" })
|
|
|
+
|
|
|
+export const InstanceDisposed = BusEvent.define(
|
|
|
+ "server.instance.disposed",
|
|
|
+ z.object({
|
|
|
+ directory: z.string(),
|
|
|
+ }),
|
|
|
+)
|
|
|
+
|
|
|
+type Payload<D extends BusEvent.Definition = BusEvent.Definition> = {
|
|
|
+ type: D["type"]
|
|
|
+ properties: z.infer<D["properties"]>
|
|
|
+}
|
|
|
+
|
|
|
+type State = {
|
|
|
+ wildcard: PubSub.PubSub<Payload>
|
|
|
+ typed: Map<string, PubSub.PubSub<Payload>>
|
|
|
+}
|
|
|
+
|
|
|
+export interface Interface {
|
|
|
+ readonly publish: <D extends BusEvent.Definition>(
|
|
|
+ def: D,
|
|
|
+ properties: z.output<D["properties"]>,
|
|
|
+ ) => Effect.Effect<void>
|
|
|
+ readonly subscribe: <D extends BusEvent.Definition>(def: D) => Stream.Stream<Payload<D>>
|
|
|
+ readonly subscribeAll: () => Stream.Stream<Payload>
|
|
|
+ readonly subscribeCallback: <D extends BusEvent.Definition>(
|
|
|
+ def: D,
|
|
|
+ callback: (event: Payload<D>) => unknown,
|
|
|
+ ) => Effect.Effect<() => void>
|
|
|
+ readonly subscribeAllCallback: (callback: (event: any) => unknown) => Effect.Effect<() => void>
|
|
|
+}
|
|
|
+
|
|
|
+export class Service extends Context.Service<Service, Interface>()("@opencode/Bus") {}
|
|
|
+
|
|
|
+export const layer = Layer.effect(
|
|
|
+ Service,
|
|
|
+ Effect.gen(function* () {
|
|
|
+ const state = yield* InstanceState.make<State>(
|
|
|
+ Effect.fn("Bus.state")(function* (ctx) {
|
|
|
+ const wildcard = yield* PubSub.unbounded<Payload>()
|
|
|
+ const typed = new Map<string, PubSub.PubSub<Payload>>()
|
|
|
+
|
|
|
+ yield* Effect.addFinalizer(() =>
|
|
|
+ Effect.gen(function* () {
|
|
|
+ // Publish InstanceDisposed before shutting down so subscribers see it
|
|
|
+ yield* PubSub.publish(wildcard, {
|
|
|
+ type: InstanceDisposed.type,
|
|
|
+ properties: { directory: ctx.directory },
|
|
|
+ })
|
|
|
+ yield* PubSub.shutdown(wildcard)
|
|
|
+ for (const ps of typed.values()) {
|
|
|
+ yield* PubSub.shutdown(ps)
|
|
|
+ }
|
|
|
+ }),
|
|
|
+ )
|
|
|
+
|
|
|
+ return { wildcard, typed }
|
|
|
+ }),
|
|
|
+ )
|
|
|
+
|
|
|
+ function getOrCreate<D extends BusEvent.Definition>(state: State, def: D) {
|
|
|
+ return Effect.gen(function* () {
|
|
|
+ let ps = state.typed.get(def.type)
|
|
|
+ if (!ps) {
|
|
|
+ ps = yield* PubSub.unbounded<Payload>()
|
|
|
+ state.typed.set(def.type, ps)
|
|
|
+ }
|
|
|
+ return ps as unknown as PubSub.PubSub<Payload<D>>
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
|
|
|
+ return Effect.gen(function* () {
|
|
|
+ const s = yield* InstanceState.get(state)
|
|
|
+ const payload: Payload = { type: def.type, properties }
|
|
|
+ log.info("publishing", { type: def.type })
|
|
|
+
|
|
|
+ const ps = s.typed.get(def.type)
|
|
|
+ if (ps) yield* PubSub.publish(ps, payload)
|
|
|
+ yield* PubSub.publish(s.wildcard, payload)
|
|
|
+
|
|
|
+ const dir = yield* InstanceState.directory
|
|
|
+ const context = yield* InstanceState.context
|
|
|
+ const workspace = yield* InstanceState.workspaceID
|
|
|
+
|
|
|
+ GlobalBus.emit("event", {
|
|
|
+ directory: dir,
|
|
|
+ project: context.project.id,
|
|
|
+ workspace,
|
|
|
+ payload,
|
|
|
+ })
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ function subscribe<D extends BusEvent.Definition>(def: D): Stream.Stream<Payload<D>> {
|
|
|
+ log.info("subscribing", { type: def.type })
|
|
|
+ return Stream.unwrap(
|
|
|
+ Effect.gen(function* () {
|
|
|
+ const s = yield* InstanceState.get(state)
|
|
|
+ const ps = yield* getOrCreate(s, def)
|
|
|
+ return Stream.fromPubSub(ps)
|
|
|
+ }),
|
|
|
+ ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: def.type }))))
|
|
|
+ }
|
|
|
+
|
|
|
+ function subscribeAll(): Stream.Stream<Payload> {
|
|
|
+ log.info("subscribing", { type: "*" })
|
|
|
+ return Stream.unwrap(
|
|
|
+ Effect.gen(function* () {
|
|
|
+ const s = yield* InstanceState.get(state)
|
|
|
+ return Stream.fromPubSub(s.wildcard)
|
|
|
+ }),
|
|
|
+ ).pipe(Stream.ensuring(Effect.sync(() => log.info("unsubscribing", { type: "*" }))))
|
|
|
+ }
|
|
|
+
|
|
|
+ function on<T>(pubsub: PubSub.PubSub<T>, type: string, callback: (event: T) => unknown) {
|
|
|
+ return Effect.gen(function* () {
|
|
|
+ log.info("subscribing", { type })
|
|
|
+ const bridge = yield* EffectBridge.make()
|
|
|
+ const scope = yield* Scope.make()
|
|
|
+ const subscription = yield* Scope.provide(scope)(PubSub.subscribe(pubsub))
|
|
|
+
|
|
|
+ yield* Scope.provide(scope)(
|
|
|
+ Stream.fromSubscription(subscription).pipe(
|
|
|
+ Stream.runForEach((msg) =>
|
|
|
+ Effect.tryPromise({
|
|
|
+ try: () => Promise.resolve().then(() => callback(msg)),
|
|
|
+ catch: (cause) => {
|
|
|
+ log.error("subscriber failed", { type, cause })
|
|
|
+ },
|
|
|
+ }).pipe(Effect.ignore),
|
|
|
+ ),
|
|
|
+ Effect.forkScoped,
|
|
|
+ ),
|
|
|
+ )
|
|
|
+
|
|
|
+ return () => {
|
|
|
+ log.info("unsubscribing", { type })
|
|
|
+ bridge.fork(Scope.close(scope, Exit.void))
|
|
|
+ }
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+ const subscribeCallback = Effect.fn("Bus.subscribeCallback")(function* <D extends BusEvent.Definition>(
|
|
|
+ def: D,
|
|
|
+ callback: (event: Payload<D>) => unknown,
|
|
|
+ ) {
|
|
|
+ const s = yield* InstanceState.get(state)
|
|
|
+ const ps = yield* getOrCreate(s, def)
|
|
|
+ return yield* on(ps, def.type, callback)
|
|
|
+ })
|
|
|
+
|
|
|
+ const subscribeAllCallback = Effect.fn("Bus.subscribeAllCallback")(function* (callback: (event: any) => unknown) {
|
|
|
+ const s = yield* InstanceState.get(state)
|
|
|
+ return yield* on(s.wildcard, "*", callback)
|
|
|
+ })
|
|
|
+
|
|
|
+ return Service.of({ publish, subscribe, subscribeAll, subscribeCallback, subscribeAllCallback })
|
|
|
+ }),
|
|
|
+)
|
|
|
+
|
|
|
+export const defaultLayer = layer
|
|
|
+
|
|
|
+const { runPromise, runSync } = makeRuntime(Service, layer)
|
|
|
+
|
|
|
+// runSync is safe here because the subscribe chain (InstanceState.get, PubSub.subscribe,
|
|
|
+// Scope.make, Effect.forkScoped) is entirely synchronous. If any step becomes async, this will throw.
|
|
|
+export async function publish<D extends BusEvent.Definition>(def: D, properties: z.output<D["properties"]>) {
|
|
|
+ return runPromise((svc) => svc.publish(def, properties))
|
|
|
+}
|
|
|
+
|
|
|
+export function subscribe<D extends BusEvent.Definition>(
|
|
|
+ def: D,
|
|
|
+ callback: (event: { type: D["type"]; properties: z.infer<D["properties"]> }) => unknown,
|
|
|
+) {
|
|
|
+ return runSync((svc) => svc.subscribeCallback(def, callback))
|
|
|
+}
|
|
|
+
|
|
|
+export function subscribeAll(callback: (event: any) => unknown) {
|
|
|
+ return runSync((svc) => svc.subscribeAllCallback(callback))
|
|
|
+}
|
|
|
+
|
|
|
+export * as Bus from "."
|