| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101 |
- import { z, type ZodType } from "zod";
- import { App } from "../app/app";
- import { Log } from "../util/log";
- export namespace Bus {
- const log = Log.create({ service: "bus" });
- type Subscription = (event: any) => void;
- const state = App.state("bus", () => {
- const subscriptions = new Map<any, Subscription[]>();
- return {
- subscriptions,
- };
- });
- export type EventDefinition = ReturnType<typeof event>;
- const registry = new Map<string, EventDefinition>();
- export function event<Type extends string, Properties extends ZodType>(
- type: Type,
- properties: Properties,
- ) {
- const result = {
- type,
- properties,
- };
- registry.set(type, result);
- return result;
- }
- export function payloads() {
- return z.discriminatedUnion(
- "type",
- registry
- .entries()
- .map(([type, def]) =>
- z
- .object({
- type: z.literal(type),
- properties: def.properties,
- })
- .openapi({
- ref: "Event" + "." + def.type,
- }),
- )
- .toArray() as any,
- );
- }
- export function publish<Definition extends EventDefinition>(
- def: Definition,
- properties: z.output<Definition["properties"]>,
- ) {
- const payload = {
- type: def.type,
- properties,
- };
- log.info("publishing", {
- type: def.type,
- });
- for (const key of [def.type, "*"]) {
- const match = state().subscriptions.get(key);
- for (const sub of match ?? []) {
- sub(payload);
- }
- }
- }
- export function subscribe<Definition extends EventDefinition>(
- def: Definition,
- callback: (event: {
- type: Definition["type"];
- properties: z.infer<Definition["properties"]>;
- }) => void,
- ) {
- return raw(def.type, callback);
- }
- export function subscribeAll(callback: (event: any) => void) {
- return raw("*", callback);
- }
- function raw(type: string, callback: (event: any) => void) {
- log.info("subscribing", { type });
- const subscriptions = state().subscriptions;
- let match = subscriptions.get(type) ?? [];
- match.push(callback);
- subscriptions.set(type, match);
- return () => {
- log.info("unsubscribing", { type });
- const match = subscriptions.get(type);
- if (!match) return;
- const index = match.indexOf(callback);
- if (index === -1) return;
- match.splice(index, 1);
- };
- }
- }
|