| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105 |
- import z from "zod"
- import { Log } from "../util/log"
- import { Instance } from "../project/instance"
- import { BusEvent } from "./bus-event"
- import { GlobalBus } from "./global"
- export namespace Bus {
- const log = Log.create({ service: "bus" })
- type Subscription = (event: any) => void
- export const InstanceDisposed = BusEvent.define(
- "server.instance.disposed",
- z.object({
- directory: z.string(),
- }),
- )
- const state = Instance.state(
- () => {
- const subscriptions = new Map<any, Subscription[]>()
- return {
- subscriptions,
- }
- },
- async (entry) => {
- const wildcard = entry.subscriptions.get("*")
- if (!wildcard) return
- const event = {
- type: InstanceDisposed.type,
- properties: {
- directory: Instance.directory,
- },
- }
- for (const sub of [...wildcard]) {
- sub(event)
- }
- },
- )
- export async function publish<Definition extends BusEvent.Definition>(
- def: Definition,
- properties: z.output<Definition["properties"]>,
- ) {
- const payload = {
- type: def.type,
- properties,
- }
- log.info("publishing", {
- type: def.type,
- })
- const pending = []
- for (const key of [def.type, "*"]) {
- const match = state().subscriptions.get(key)
- for (const sub of match ?? []) {
- pending.push(sub(payload))
- }
- }
- GlobalBus.emit("event", {
- directory: Instance.directory,
- payload,
- })
- return Promise.all(pending)
- }
- export function subscribe<Definition extends BusEvent.Definition>(
- def: Definition,
- callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
- ) {
- return raw(def.type, callback)
- }
- export function once<Definition extends BusEvent.Definition>(
- def: Definition,
- callback: (event: {
- type: Definition["type"]
- properties: z.infer<Definition["properties"]>
- }) => "done" | undefined,
- ) {
- const unsub = subscribe(def, (event) => {
- if (callback(event)) unsub()
- })
- }
- export function subscribeAll(callback: (event: any) => void) {
- return raw("*", callback)
- }
- function raw(type: string, callback: (event: any) => void) {
- log.info("subscribing", { type })
- const subscriptions = state().subscriptions
- let match = subscriptions.get(type) ?? []
- match.push(callback)
- subscriptions.set(type, match)
- return () => {
- log.info("unsubscribing", { type })
- const match = subscriptions.get(type)
- if (!match) return
- const index = match.indexOf(callback)
- if (index === -1) return
- match.splice(index, 1)
- }
- }
- }
|