index.ts 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101
  1. import { z, type ZodType } from "zod";
  2. import { App } from "../app/app";
  3. import { Log } from "../util/log";
  4. export namespace Bus {
  5. const log = Log.create({ service: "bus" });
  6. type Subscription = (event: any) => void;
  7. const state = App.state("bus", () => {
  8. const subscriptions = new Map<any, Subscription[]>();
  9. return {
  10. subscriptions,
  11. };
  12. });
  13. export type EventDefinition = ReturnType<typeof event>;
  14. const registry = new Map<string, EventDefinition>();
  15. export function event<Type extends string, Properties extends ZodType>(
  16. type: Type,
  17. properties: Properties,
  18. ) {
  19. const result = {
  20. type,
  21. properties,
  22. };
  23. registry.set(type, result);
  24. return result;
  25. }
  26. export function payloads() {
  27. return z.discriminatedUnion(
  28. "type",
  29. registry
  30. .entries()
  31. .map(([type, def]) =>
  32. z
  33. .object({
  34. type: z.literal(type),
  35. properties: def.properties,
  36. })
  37. .openapi({
  38. ref: "Event" + "." + def.type,
  39. }),
  40. )
  41. .toArray() as any,
  42. );
  43. }
  44. export function publish<Definition extends EventDefinition>(
  45. def: Definition,
  46. properties: z.output<Definition["properties"]>,
  47. ) {
  48. const payload = {
  49. type: def.type,
  50. properties,
  51. };
  52. log.info("publishing", {
  53. type: def.type,
  54. });
  55. for (const key of [def.type, "*"]) {
  56. const match = state().subscriptions.get(key);
  57. for (const sub of match ?? []) {
  58. sub(payload);
  59. }
  60. }
  61. }
  62. export function subscribe<Definition extends EventDefinition>(
  63. def: Definition,
  64. callback: (event: {
  65. type: Definition["type"];
  66. properties: z.infer<Definition["properties"]>;
  67. }) => void,
  68. ) {
  69. return raw(def.type, callback);
  70. }
  71. export function subscribeAll(callback: (event: any) => void) {
  72. return raw("*", callback);
  73. }
  74. function raw(type: string, callback: (event: any) => void) {
  75. log.info("subscribing", { type });
  76. const subscriptions = state().subscriptions;
  77. let match = subscriptions.get(type) ?? [];
  78. match.push(callback);
  79. subscriptions.set(type, match);
  80. return () => {
  81. log.info("unsubscribing", { type });
  82. const match = subscriptions.get(type);
  83. if (!match) return;
  84. const index = match.indexOf(callback);
  85. if (index === -1) return;
  86. match.splice(index, 1);
  87. };
  88. }
  89. }