index.ts 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. import z from "zod"
  2. import type { ZodType } from "zod"
  3. import { Log } from "../util/log"
  4. import { Instance } from "../project/instance"
  5. import { GlobalBus } from "./global"
  6. export namespace Bus {
  7. const log = Log.create({ service: "bus" })
  8. type Subscription = (event: any) => void
  9. const state = Instance.state(() => {
  10. const subscriptions = new Map<any, Subscription[]>()
  11. return {
  12. subscriptions,
  13. }
  14. })
  15. export type EventDefinition = ReturnType<typeof event>
  16. const registry = new Map<string, EventDefinition>()
  17. export function event<Type extends string, Properties extends ZodType>(type: Type, properties: Properties) {
  18. const result = {
  19. type,
  20. properties,
  21. }
  22. registry.set(type, result)
  23. return result
  24. }
  25. export function payloads() {
  26. return z
  27. .discriminatedUnion(
  28. "type",
  29. registry
  30. .entries()
  31. .map(([type, def]) => {
  32. return z
  33. .object({
  34. type: z.literal(type),
  35. properties: def.properties,
  36. })
  37. .meta({
  38. ref: "Event" + "." + def.type,
  39. })
  40. })
  41. .toArray() as any,
  42. )
  43. .meta({
  44. ref: "Event",
  45. })
  46. }
  47. export async function publish<Definition extends EventDefinition>(
  48. def: Definition,
  49. properties: z.output<Definition["properties"]>,
  50. ) {
  51. const payload = {
  52. type: def.type,
  53. properties,
  54. }
  55. log.info("publishing", {
  56. type: def.type,
  57. })
  58. const pending = []
  59. for (const key of [def.type, "*"]) {
  60. const match = state().subscriptions.get(key)
  61. for (const sub of match ?? []) {
  62. pending.push(sub(payload))
  63. }
  64. }
  65. GlobalBus.emit("event", {
  66. directory: Instance.directory,
  67. payload,
  68. })
  69. return Promise.all(pending)
  70. }
  71. export function subscribe<Definition extends EventDefinition>(
  72. def: Definition,
  73. callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
  74. ) {
  75. return raw(def.type, callback)
  76. }
  77. export function once<Definition extends EventDefinition>(
  78. def: Definition,
  79. callback: (event: {
  80. type: Definition["type"]
  81. properties: z.infer<Definition["properties"]>
  82. }) => "done" | undefined,
  83. ) {
  84. const unsub = subscribe(def, (event) => {
  85. if (callback(event)) unsub()
  86. })
  87. }
  88. export function subscribeAll(callback: (event: any) => void) {
  89. return raw("*", callback)
  90. }
  91. function raw(type: string, callback: (event: any) => void) {
  92. log.info("subscribing", { type })
  93. const subscriptions = state().subscriptions
  94. let match = subscriptions.get(type) ?? []
  95. match.push(callback)
  96. subscriptions.set(type, match)
  97. return () => {
  98. log.info("unsubscribing", { type })
  99. const match = subscriptions.get(type)
  100. if (!match) return
  101. const index = match.indexOf(callback)
  102. if (index === -1) return
  103. match.splice(index, 1)
  104. }
  105. }
  106. }