sync-routes.ts 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189
  1. import z from "zod"
  2. import { Hono } from "hono"
  3. import { describeRoute, validator, resolver } from "hono-openapi"
  4. import { SyncEvent } from "@/sync"
  5. import { Database, asc, and, not, or, lte, eq } from "@/storage/db"
  6. import { EventTable } from "@/sync/event.sql"
  7. import { Log } from "@/util/log"
  8. import { lazy } from "@/util/lazy"
  9. import { Instance } from "@/project/instance"
  10. import { InstanceBootstrap } from "../../project/bootstrap"
  11. import { errors } from "../error"
  12. import { streamQueue } from "../stream-queue"
  13. const log = Log.create({ service: "server" })
  14. const ReplayEvent = z.object({
  15. id: z.string(),
  16. aggregateID: z.string(),
  17. seq: z.number().int().min(0),
  18. type: z.string(),
  19. data: z.record(z.string(), z.unknown()),
  20. })
  21. export const SyncRoutes = lazy(() =>
  22. new Hono()
  23. .get(
  24. "/event",
  25. describeRoute({
  26. summary: "Subscribe to sync events",
  27. description: "Get sync events",
  28. operationId: "sync.event",
  29. responses: {
  30. 200: {
  31. description: "Event stream",
  32. content: {
  33. "text/event-stream": {
  34. schema: resolver(
  35. z
  36. .object({
  37. payload: SyncEvent.payloads(),
  38. })
  39. .meta({
  40. ref: "SyncEvent",
  41. }),
  42. ),
  43. },
  44. },
  45. },
  46. },
  47. }),
  48. async (c) => {
  49. log.info("sync event connected")
  50. c.header("X-Accel-Buffering", "no")
  51. c.header("X-Content-Type-Options", "nosniff")
  52. return streamQueue(c, {
  53. connect: (q) => {
  54. log.info("sync event connected")
  55. q.push(
  56. JSON.stringify({
  57. type: "server.connected",
  58. properties: {},
  59. }),
  60. )
  61. },
  62. heartbeat: (q) => {
  63. q.push(
  64. JSON.stringify({
  65. type: "server.heartbeat",
  66. properties: {},
  67. }),
  68. )
  69. },
  70. subscribe: (q) => {
  71. const unsub = SyncEvent.subscribeAll(({ def, event }) => {
  72. q.push(JSON.stringify({ ...event, type: SyncEvent.versionedType(def.type, def.version) }))
  73. })
  74. return () => {
  75. unsub()
  76. log.info("sync event disconnected")
  77. }
  78. },
  79. })
  80. },
  81. )
  82. .post(
  83. "/replay",
  84. describeRoute({
  85. summary: "Replay sync events",
  86. description: "Validate and replay a complete sync event history.",
  87. operationId: "global.sync-replay",
  88. responses: {
  89. 200: {
  90. description: "Replayed sync events",
  91. content: {
  92. "application/json": {
  93. schema: resolver(
  94. z.object({
  95. sessionID: z.string(),
  96. }),
  97. ),
  98. },
  99. },
  100. },
  101. ...errors(400),
  102. },
  103. }),
  104. validator(
  105. "json",
  106. z.object({
  107. directory: z.string(),
  108. events: z.array(ReplayEvent).min(1),
  109. }),
  110. ),
  111. async (c) => {
  112. const body = c.req.valid("json")
  113. const events = body.events
  114. const source = events[0].aggregateID
  115. if (events.some((item) => item.aggregateID !== source)) {
  116. throw new Error("Replay events must belong to the same session")
  117. }
  118. for (const [i, item] of events.entries()) {
  119. if (item.seq !== i) throw new Error(`Replay sequence mismatch at index ${i}: expected ${i}, got ${item.seq}`)
  120. }
  121. return Instance.provide({
  122. directory: body.directory,
  123. init: InstanceBootstrap,
  124. async fn() {
  125. for (const item of events) {
  126. SyncEvent.replay(item)
  127. }
  128. return c.json({ sessionID: source })
  129. },
  130. })
  131. },
  132. )
  133. .get(
  134. "/history",
  135. describeRoute({
  136. summary: "List sync events",
  137. description: "List sync events for all aggregates. Keys are aggregate IDs the client already knows about, values are the last known sequence ID. Events with seq > value are returned for those aggregates. Aggregates not listed in the input get their full history.",
  138. operationId: "global.sync-history.list",
  139. responses: {
  140. 200: {
  141. description: "Sync events",
  142. content: {
  143. "application/json": {
  144. schema: resolver(
  145. z.array(
  146. z.object({
  147. id: z.string(),
  148. aggregate_id: z.string(),
  149. seq: z.number(),
  150. type: z.string(),
  151. data: z.record(z.string(), z.unknown()),
  152. }),
  153. ),
  154. ),
  155. },
  156. },
  157. },
  158. ...errors(400),
  159. },
  160. }),
  161. validator(
  162. "json",
  163. z.record(z.string(), z.number().int().min(0)),
  164. ),
  165. async (c) => {
  166. const body = c.req.valid("json")
  167. const exclude = Object.entries(body)
  168. const where = exclude.length > 0
  169. ? not(or(...exclude.map(([id, seq]) => and(eq(EventTable.aggregate_id, id), lte(EventTable.seq, seq))))!)
  170. : undefined
  171. const rows = Database.use((db) =>
  172. db
  173. .select()
  174. .from(EventTable)
  175. .where(where)
  176. .orderBy(asc(EventTable.seq))
  177. .all(),
  178. )
  179. return c.json(rows)
  180. },
  181. ),
  182. )