index.ts 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. import { z, type ZodType } from "zod"
  2. import { App } from "../app/app"
  3. import { Log } from "../util/log"
  4. export namespace Bus {
  5. const log = Log.create({ service: "bus" })
  6. type Subscription = (event: any) => void
  7. const state = App.state("bus", () => {
  8. const subscriptions = new Map<any, Subscription[]>()
  9. return {
  10. subscriptions,
  11. }
  12. })
  13. export type EventDefinition = ReturnType<typeof event>
  14. const registry = new Map<string, EventDefinition>()
  15. export function event<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
  16. const result = {
  17. type,
  18. properties,
  19. }
  20. registry.set(type, result)
  21. return result
  22. }
  23. export function payloads() {
  24. return z.discriminatedUnion(
  25. "type",
  26. registry
  27. .entries()
  28. .map(([type, def]) =>
  29. z
  30. .object({
  31. type: z.literal(type),
  32. properties: def.properties,
  33. })
  34. .openapi({
  35. ref: "Event" + "." + def.type,
  36. }),
  37. )
  38. .toArray() as any,
  39. )
  40. }
  41. export async function publish<Definition extends EventDefinition>(
  42. def: Definition,
  43. properties: z.output<Definition["properties"]>,
  44. ) {
  45. const payload = {
  46. type: def.type,
  47. properties,
  48. }
  49. log.info("publishing", {
  50. type: def.type,
  51. })
  52. const pending = []
  53. for (const key of [def.type, "*"]) {
  54. const match = state().subscriptions.get(key)
  55. for (const sub of match ?? []) {
  56. pending.push(sub(payload))
  57. }
  58. }
  59. return Promise.all(pending)
  60. }
  61. export function subscribe<Definition extends EventDefinition>(
  62. def: Definition,
  63. callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
  64. ) {
  65. return raw(def.type, callback)
  66. }
  67. export function once<Definition extends EventDefinition>(
  68. def: Definition,
  69. callback: (event: {
  70. type: Definition["type"]
  71. properties: z.infer<Definition["properties"]>
  72. }) => "done" | undefined,
  73. ) {
  74. const unsub = subscribe(def, (event) => {
  75. if (callback(event)) unsub()
  76. })
  77. }
  78. export function subscribeAll(callback: (event: any) => void) {
  79. return raw("*", callback)
  80. }
  81. function raw(type: string, callback: (event: any) => void) {
  82. log.info("subscribing", { type })
  83. const subscriptions = state().subscriptions
  84. let match = subscriptions.get(type) ?? []
  85. match.push(callback)
  86. subscriptions.set(type, match)
  87. return () => {
  88. log.info("unsubscribing", { type })
  89. const match = subscriptions.get(type)
  90. if (!match) return
  91. const index = match.indexOf(callback)
  92. if (index === -1) return
  93. match.splice(index, 1)
  94. }
  95. }
  96. }