import z from "zod" import { Log } from "../util/log" import { Instance } from "../project/instance" import { BusEvent } from "./bus-event" import { GlobalBus } from "./global" export namespace Bus { const log = Log.create({ service: "bus" }) type Subscription = (event: any) => void export const InstanceDisposed = BusEvent.define( "server.instance.disposed", z.object({ directory: z.string(), }), ) const state = Instance.state( () => { const subscriptions = new Map() return { subscriptions, } }, async (entry) => { const wildcard = entry.subscriptions.get("*") if (!wildcard) return const event = { type: InstanceDisposed.type, properties: { directory: Instance.directory, }, } for (const sub of [...wildcard]) { sub(event) } }, ) export async function publish( def: Definition, properties: z.output, ) { const payload = { type: def.type, properties, } log.info("publishing", { type: def.type, }) const pending = [] for (const key of [def.type, "*"]) { const match = state().subscriptions.get(key) for (const sub of match ?? []) { pending.push(sub(payload)) } } GlobalBus.emit("event", { directory: Instance.directory, payload, }) return Promise.all(pending) } export function subscribe( def: Definition, callback: (event: { type: Definition["type"]; properties: z.infer }) => void, ) { return raw(def.type, callback) } export function once( def: Definition, callback: (event: { type: Definition["type"] properties: z.infer }) => "done" | undefined, ) { const unsub = subscribe(def, (event) => { if (callback(event)) unsub() }) } export function subscribeAll(callback: (event: any) => void) { return raw("*", callback) } function raw(type: string, callback: (event: any) => void) { log.info("subscribing", { type }) const subscriptions = state().subscriptions let match = subscriptions.get(type) ?? [] match.push(callback) subscriptions.set(type, match) return () => { log.info("unsubscribing", { type }) const match = subscriptions.get(type) if (!match) return const index = match.indexOf(callback) if (index === -1) return match.splice(index, 1) } } }