index.ts 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import z from "zod"
  2. import { Log } from "../util/log"
  3. import { Instance } from "../project/instance"
  4. import { BusEvent } from "./bus-event"
  5. import { GlobalBus } from "./global"
  6. export namespace Bus {
  7. const log = Log.create({ service: "bus" })
  8. type Subscription = (event: any) => void
  9. export const InstanceDisposed = BusEvent.define(
  10. "server.instance.disposed",
  11. z.object({
  12. directory: z.string(),
  13. }),
  14. )
  15. const state = Instance.state(
  16. () => {
  17. const subscriptions = new Map<any, Subscription[]>()
  18. return {
  19. subscriptions,
  20. }
  21. },
  22. async (entry) => {
  23. const wildcard = entry.subscriptions.get("*")
  24. if (!wildcard) return
  25. const event = {
  26. type: InstanceDisposed.type,
  27. properties: {
  28. directory: Instance.directory,
  29. },
  30. }
  31. for (const sub of [...wildcard]) {
  32. sub(event)
  33. }
  34. },
  35. )
  36. export async function publish<Definition extends BusEvent.Definition>(
  37. def: Definition,
  38. properties: z.output<Definition["properties"]>,
  39. ) {
  40. const payload = {
  41. type: def.type,
  42. properties,
  43. }
  44. log.info("publishing", {
  45. type: def.type,
  46. })
  47. const pending = []
  48. for (const key of [def.type, "*"]) {
  49. const match = state().subscriptions.get(key)
  50. for (const sub of match ?? []) {
  51. pending.push(sub(payload))
  52. }
  53. }
  54. GlobalBus.emit("event", {
  55. directory: Instance.directory,
  56. payload,
  57. })
  58. return Promise.all(pending)
  59. }
  60. export function subscribe<Definition extends BusEvent.Definition>(
  61. def: Definition,
  62. callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
  63. ) {
  64. return raw(def.type, callback)
  65. }
  66. export function once<Definition extends BusEvent.Definition>(
  67. def: Definition,
  68. callback: (event: {
  69. type: Definition["type"]
  70. properties: z.infer<Definition["properties"]>
  71. }) => "done" | undefined,
  72. ) {
  73. const unsub = subscribe(def, (event) => {
  74. if (callback(event)) unsub()
  75. })
  76. }
  77. export function subscribeAll(callback: (event: any) => void) {
  78. return raw("*", callback)
  79. }
  80. function raw(type: string, callback: (event: any) => void) {
  81. log.info("subscribing", { type })
  82. const subscriptions = state().subscriptions
  83. let match = subscriptions.get(type) ?? []
  84. match.push(callback)
  85. subscriptions.set(type, match)
  86. return () => {
  87. log.info("unsubscribing", { type })
  88. const match = subscriptions.get(type)
  89. if (!match) return
  90. const index = match.indexOf(callback)
  91. if (index === -1) return
  92. match.splice(index, 1)
  93. }
  94. }
  95. }