| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119 |
- import z from "zod"
- import type { ZodType } from "zod"
- import { Log } from "../util/log"
- import { Instance } from "../project/instance"
- import { GlobalBus } from "./global"
- export namespace Bus {
- const log = Log.create({ service: "bus" })
- type Subscription = (event: any) => void
- const state = Instance.state(() => {
- const subscriptions = new Map<any, Subscription[]>()
- return {
- subscriptions,
- }
- })
- export type EventDefinition = ReturnType<typeof event>
- const registry = new Map<string, EventDefinition>()
- export function event<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
- const result = {
- type,
- properties,
- }
- registry.set(type, result)
- return result
- }
- export function payloads() {
- return z
- .discriminatedUnion(
- "type",
- registry
- .entries()
- .map(([type, def]) => {
- return z
- .object({
- type: z.literal(type),
- properties: def.properties,
- })
- .meta({
- ref: "Event" + "." + def.type,
- })
- })
- .toArray() as any,
- )
- .meta({
- ref: "Event",
- })
- }
- export async function publish<Definition extends EventDefinition>(
- def: Definition,
- properties: z.output<Definition["properties"]>,
- ) {
- const payload = {
- type: def.type,
- properties,
- }
- log.info("publishing", {
- type: def.type,
- })
- const pending = []
- for (const key of [def.type, "*"]) {
- const match = state().subscriptions.get(key)
- for (const sub of match ?? []) {
- pending.push(sub(payload))
- }
- }
- GlobalBus.emit("event", {
- directory: Instance.directory,
- payload,
- })
- return Promise.all(pending)
- }
- export function subscribe<Definition extends EventDefinition>(
- def: Definition,
- callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
- ) {
- return raw(def.type, callback)
- }
- export function once<Definition extends EventDefinition>(
- def: Definition,
- callback: (event: {
- type: Definition["type"]
- properties: z.infer<Definition["properties"]>
- }) => "done" | undefined,
- ) {
- const unsub = subscribe(def, (event) => {
- if (callback(event)) unsub()
- })
- }
- export function subscribeAll(callback: (event: any) => void) {
- return raw("*", callback)
- }
- function raw(type: string, callback: (event: any) => void) {
- log.info("subscribing", { type })
- const subscriptions = state().subscriptions
- let match = subscriptions.get(type) ?? []
- match.push(callback)
- subscriptions.set(type, match)
- return () => {
- log.info("unsubscribing", { type })
- const match = subscriptions.get(type)
- if (!match) return
- const index = match.indexOf(callback)
- if (index === -1) return
- match.splice(index, 1)
- }
- }
- }
|