index.ts 2.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. import z from "zod"
  2. import { Log } from "../util/log"
  3. import { Instance } from "../project/instance"
  4. import { BusEvent } from "./bus-event"
  5. import { GlobalBus } from "./global"
  6. export namespace Bus {
  7. const log = Log.create({ service: "bus" })
  8. type Subscription = (event: any) => void
  9. const disposedEventType = "server.instance.disposed"
  10. const state = Instance.state(
  11. () => {
  12. const subscriptions = new Map<any, Subscription[]>()
  13. return {
  14. subscriptions,
  15. }
  16. },
  17. async (entry) => {
  18. const wildcard = entry.subscriptions.get("*")
  19. if (!wildcard) return
  20. const event = {
  21. type: disposedEventType,
  22. properties: {
  23. directory: Instance.directory,
  24. },
  25. }
  26. for (const sub of [...wildcard]) {
  27. sub(event)
  28. }
  29. },
  30. )
  31. export const InstanceDisposed = BusEvent.define(
  32. disposedEventType,
  33. z.object({
  34. directory: z.string(),
  35. }),
  36. )
  37. export async function publish<Definition extends BusEvent.Definition>(
  38. def: Definition,
  39. properties: z.output<Definition["properties"]>,
  40. ) {
  41. const payload = {
  42. type: def.type,
  43. properties,
  44. }
  45. log.info("publishing", {
  46. type: def.type,
  47. })
  48. const pending = []
  49. for (const key of [def.type, "*"]) {
  50. const match = state().subscriptions.get(key)
  51. for (const sub of match ?? []) {
  52. pending.push(sub(payload))
  53. }
  54. }
  55. GlobalBus.emit("event", {
  56. directory: Instance.directory,
  57. payload,
  58. })
  59. return Promise.all(pending)
  60. }
  61. export function subscribe<Definition extends BusEvent.Definition>(
  62. def: Definition,
  63. callback: (event: { type: Definition["type"]; properties: z.infer<Definition["properties"]> }) => void,
  64. ) {
  65. return raw(def.type, callback)
  66. }
  67. export function once<Definition extends BusEvent.Definition>(
  68. def: Definition,
  69. callback: (event: {
  70. type: Definition["type"]
  71. properties: z.infer<Definition["properties"]>
  72. }) => "done" | undefined,
  73. ) {
  74. const unsub = subscribe(def, (event) => {
  75. if (callback(event)) unsub()
  76. })
  77. }
  78. export function subscribeAll(callback: (event: any) => void) {
  79. return raw("*", callback)
  80. }
  81. function raw(type: string, callback: (event: any) => void) {
  82. log.info("subscribing", { type })
  83. const subscriptions = state().subscriptions
  84. let match = subscriptions.get(type) ?? []
  85. match.push(callback)
  86. subscriptions.set(type, match)
  87. return () => {
  88. log.info("unsubscribing", { type })
  89. const match = subscriptions.get(type)
  90. if (!match) return
  91. const index = match.indexOf(callback)
  92. if (index === -1) return
  93. match.splice(index, 1)
  94. }
  95. }
  96. }