| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- import z from "zod"
- import { Hono } from "hono"
- import { describeRoute, validator, resolver } from "hono-openapi"
- import { SyncEvent } from "@/sync"
- import { Database, asc, and, not, or, lte, eq } from "@/storage/db"
- import { EventTable } from "@/sync/event.sql"
- import { Log } from "@/util/log"
- import { lazy } from "@/util/lazy"
- import { Instance } from "@/project/instance"
- import { InstanceBootstrap } from "../../project/bootstrap"
- import { errors } from "../error"
- import { streamQueue } from "../stream-queue"
- const log = Log.create({ service: "server" })
- const ReplayEvent = z.object({
- id: z.string(),
- aggregateID: z.string(),
- seq: z.number().int().min(0),
- type: z.string(),
- data: z.record(z.string(), z.unknown()),
- })
- export const SyncRoutes = lazy(() =>
- new Hono()
- .get(
- "/event",
- describeRoute({
- summary: "Subscribe to sync events",
- description: "Get sync events",
- operationId: "sync.event",
- responses: {
- 200: {
- description: "Event stream",
- content: {
- "text/event-stream": {
- schema: resolver(
- z
- .object({
- payload: SyncEvent.payloads(),
- })
- .meta({
- ref: "SyncEvent",
- }),
- ),
- },
- },
- },
- },
- }),
- async (c) => {
- log.info("sync event connected")
- c.header("X-Accel-Buffering", "no")
- c.header("X-Content-Type-Options", "nosniff")
- return streamQueue(c, {
- connect: (q) => {
- log.info("sync event connected")
- q.push(
- JSON.stringify({
- type: "server.connected",
- properties: {},
- }),
- )
- },
- heartbeat: (q) => {
- q.push(
- JSON.stringify({
- type: "server.heartbeat",
- properties: {},
- }),
- )
- },
- subscribe: (q) => {
- const unsub = SyncEvent.subscribeAll(({ def, event }) => {
- q.push(JSON.stringify({ ...event, type: SyncEvent.versionedType(def.type, def.version) }))
- })
- return () => {
- unsub()
- log.info("sync event disconnected")
- }
- },
- })
- },
- )
- .post(
- "/replay",
- describeRoute({
- summary: "Replay sync events",
- description: "Validate and replay a complete sync event history.",
- operationId: "global.sync-replay",
- responses: {
- 200: {
- description: "Replayed sync events",
- content: {
- "application/json": {
- schema: resolver(
- z.object({
- sessionID: z.string(),
- }),
- ),
- },
- },
- },
- ...errors(400),
- },
- }),
- validator(
- "json",
- z.object({
- directory: z.string(),
- events: z.array(ReplayEvent).min(1),
- }),
- ),
- async (c) => {
- const body = c.req.valid("json")
- const events = body.events
- const source = events[0].aggregateID
- if (events.some((item) => item.aggregateID !== source)) {
- throw new Error("Replay events must belong to the same session")
- }
- for (const [i, item] of events.entries()) {
- if (item.seq !== i) throw new Error(`Replay sequence mismatch at index ${i}: expected ${i}, got ${item.seq}`)
- }
- return Instance.provide({
- directory: body.directory,
- init: InstanceBootstrap,
- async fn() {
- for (const item of events) {
- SyncEvent.replay(item)
- }
- return c.json({ sessionID: source })
- },
- })
- },
- )
- .get(
- "/history",
- describeRoute({
- summary: "List sync events",
- description: "List sync events for all aggregates. Keys are aggregate IDs the client already knows about, values are the last known sequence ID. Events with seq > value are returned for those aggregates. Aggregates not listed in the input get their full history.",
- operationId: "global.sync-history.list",
- responses: {
- 200: {
- description: "Sync events",
- content: {
- "application/json": {
- schema: resolver(
- z.array(
- z.object({
- id: z.string(),
- aggregate_id: z.string(),
- seq: z.number(),
- type: z.string(),
- data: z.record(z.string(), z.unknown()),
- }),
- ),
- ),
- },
- },
- },
- ...errors(400),
- },
- }),
- validator(
- "json",
- z.record(z.string(), z.number().int().min(0)),
- ),
- async (c) => {
- const body = c.req.valid("json")
- const exclude = Object.entries(body)
- const where = exclude.length > 0
- ? not(or(...exclude.map(([id, seq]) => and(eq(EventTable.aggregate_id, id), lte(EventTable.seq, seq))))!)
- : undefined
- const rows = Database.use((db) =>
- db
- .select()
- .from(EventTable)
- .where(where)
- .orderBy(asc(EventTable.seq))
- .all(),
- )
- return c.json(rows)
- },
- ),
- )
|