|
|
@@ -38,8 +38,9 @@ const ShareSchema = Schema.Struct({
|
|
|
export type Share = typeof ShareSchema.Type
|
|
|
|
|
|
type State = {
|
|
|
- queue: Map<string, { data: Map<string, Data> }>
|
|
|
+ queue: Map<SessionID, Map<string, Data>>
|
|
|
scope: Scope.Closeable
|
|
|
+ shared: Map<SessionID, Share | null>
|
|
|
}
|
|
|
|
|
|
type Data =
|
|
|
@@ -118,17 +119,20 @@ export const layer = Layer.effect(
|
|
|
function sync(sessionID: SessionID, data: Data[]): Effect.Effect<void> {
|
|
|
return Effect.gen(function* () {
|
|
|
if (disabled) return
|
|
|
+ const share = yield* getCached(sessionID)
|
|
|
+ if (!share) return
|
|
|
+
|
|
|
const s = yield* InstanceState.get(state)
|
|
|
const existing = s.queue.get(sessionID)
|
|
|
if (existing) {
|
|
|
for (const item of data) {
|
|
|
- existing.data.set(key(item), item)
|
|
|
+ existing.set(key(item), item)
|
|
|
}
|
|
|
return
|
|
|
}
|
|
|
|
|
|
const next = new Map(data.map((item) => [key(item), item]))
|
|
|
- s.queue.set(sessionID, { data: next })
|
|
|
+ s.queue.set(sessionID, next)
|
|
|
yield* flush(sessionID).pipe(
|
|
|
Effect.delay(1000),
|
|
|
Effect.catchCause((cause) =>
|
|
|
@@ -143,13 +147,14 @@ export const layer = Layer.effect(
|
|
|
|
|
|
const state: InstanceState.InstanceState<State> = yield* InstanceState.make<State>(
|
|
|
Effect.fn("ShareNext.state")(function* (_ctx) {
|
|
|
- const cache: State = { queue: new Map(), scope: yield* Scope.make() }
|
|
|
+ const cache: State = { queue: new Map(), scope: yield* Scope.make(), shared: new Map() }
|
|
|
|
|
|
yield* Effect.addFinalizer(() =>
|
|
|
Scope.close(cache.scope, Exit.void).pipe(
|
|
|
Effect.andThen(
|
|
|
Effect.sync(() => {
|
|
|
cache.queue.clear()
|
|
|
+ cache.shared.clear()
|
|
|
}),
|
|
|
),
|
|
|
),
|
|
|
@@ -227,6 +232,18 @@ export const layer = Layer.effect(
|
|
|
return { id: row.id, secret: row.secret, url: row.url } satisfies Share
|
|
|
})
|
|
|
|
|
|
+ const getCached = Effect.fnUntraced(function* (sessionID: SessionID) {
|
|
|
+ const s = yield* InstanceState.get(state)
|
|
|
+ if (s.shared.has(sessionID)) {
|
|
|
+ const cached = s.shared.get(sessionID)
|
|
|
+ return cached === null ? undefined : cached
|
|
|
+ }
|
|
|
+
|
|
|
+ const share = yield* get(sessionID)
|
|
|
+ s.shared.set(sessionID, share ?? null)
|
|
|
+ return share
|
|
|
+ })
|
|
|
+
|
|
|
const flush = Effect.fn("ShareNext.flush")(function* (sessionID: SessionID) {
|
|
|
if (disabled) return
|
|
|
const s = yield* InstanceState.get(state)
|
|
|
@@ -235,13 +252,13 @@ export const layer = Layer.effect(
|
|
|
|
|
|
s.queue.delete(sessionID)
|
|
|
|
|
|
- const share = yield* get(sessionID)
|
|
|
+ const share = yield* getCached(sessionID)
|
|
|
if (!share) return
|
|
|
|
|
|
const req = yield* request()
|
|
|
const res = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.sync(share.id)}`).pipe(
|
|
|
HttpClientRequest.setHeaders(req.headers),
|
|
|
- HttpClientRequest.bodyJson({ secret: share.secret, data: Array.from(queued.data.values()) }),
|
|
|
+ HttpClientRequest.bodyJson({ secret: share.secret, data: Array.from(queued.values()) }),
|
|
|
Effect.flatMap((r) => http.execute(r)),
|
|
|
)
|
|
|
|
|
|
@@ -307,6 +324,7 @@ export const layer = Layer.effect(
|
|
|
.run(),
|
|
|
)
|
|
|
const s = yield* InstanceState.get(state)
|
|
|
+ s.shared.set(sessionID, result)
|
|
|
yield* full(sessionID).pipe(
|
|
|
Effect.catchCause((cause) =>
|
|
|
Effect.sync(() => {
|
|
|
@@ -321,8 +339,13 @@ export const layer = Layer.effect(
|
|
|
const remove = Effect.fn("ShareNext.remove")(function* (sessionID: SessionID) {
|
|
|
if (disabled) return
|
|
|
log.info("removing share", { sessionID })
|
|
|
- const share = yield* get(sessionID)
|
|
|
- if (!share) return
|
|
|
+ const s = yield* InstanceState.get(state)
|
|
|
+ const share = yield* getCached(sessionID)
|
|
|
+ if (!share) {
|
|
|
+ s.shared.delete(sessionID)
|
|
|
+ s.queue.delete(sessionID)
|
|
|
+ return
|
|
|
+ }
|
|
|
|
|
|
const req = yield* request()
|
|
|
yield* HttpClientRequest.delete(`${req.baseUrl}${req.api.remove(share.id)}`).pipe(
|
|
|
@@ -332,6 +355,8 @@ export const layer = Layer.effect(
|
|
|
)
|
|
|
|
|
|
yield* db((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run())
|
|
|
+ s.shared.delete(sessionID)
|
|
|
+ s.queue.delete(sessionID)
|
|
|
})
|
|
|
|
|
|
return Service.of({ init, url, request, create, remove })
|