Просмотр исходного кода

refactor(share): effectify share next (#20596)

Kit Langton 2 недель назад
Родитель
Сommit
8e9e79d276
2 измененных файлов с 626 добавлено и 287 удалено
  1. 299 217
      packages/opencode/src/share/share-next.ts
  2. 327 70
      packages/opencode/test/share/share-next.test.ts

+ 299 - 217
packages/opencode/src/share/share-next.ts

@@ -1,152 +1,47 @@
-import { Bus } from "@/bus"
+import type * as SDK from "@opencode-ai/sdk/v2"
+import { Effect, Exit, Layer, Option, Schema, Scope, ServiceMap, Stream } from "effect"
+import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
 import { Account } from "@/account"
-import { Config } from "@/config/config"
+import { Bus } from "@/bus"
+import { InstanceState } from "@/effect/instance-state"
+import { makeRuntime } from "@/effect/run-service"
 import { Provider } from "@/provider/provider"
-import { ProviderID, ModelID } from "@/provider/schema"
+import { ModelID, ProviderID } from "@/provider/schema"
 import { Session } from "@/session"
-import type { SessionID } from "@/session/schema"
 import { MessageV2 } from "@/session/message-v2"
+import type { SessionID } from "@/session/schema"
 import { Database, eq } from "@/storage/db"
-import { SessionShareTable } from "./share.sql"
+import { Config } from "@/config/config"
 import { Log } from "@/util/log"
-import type * as SDK from "@opencode-ai/sdk/v2"
+import { SessionShareTable } from "./share.sql"
 
 export namespace ShareNext {
   const log = Log.create({ service: "share-next" })
+  const disabled = process.env["OPENCODE_DISABLE_SHARE"] === "true" || process.env["OPENCODE_DISABLE_SHARE"] === "1"
 
-  type ApiEndpoints = {
+  export type Api = {
     create: string
-    sync: (shareId: string) => string
-    remove: (shareId: string) => string
-    data: (shareId: string) => string
-  }
-
-  function apiEndpoints(resource: string): ApiEndpoints {
-    return {
-      create: `/api/${resource}`,
-      sync: (shareId) => `/api/${resource}/${shareId}/sync`,
-      remove: (shareId) => `/api/${resource}/${shareId}`,
-      data: (shareId) => `/api/${resource}/${shareId}/data`,
-    }
-  }
-
-  const legacyApi = apiEndpoints("share")
-  const consoleApi = apiEndpoints("shares")
-
-  export async function url() {
-    const req = await request()
-    return req.baseUrl
+    sync: (shareID: string) => string
+    remove: (shareID: string) => string
+    data: (shareID: string) => string
   }
 
-  export async function request(): Promise<{
+  export type Req = {
     headers: Record<string, string>
-    api: ApiEndpoints
+    api: Api
     baseUrl: string
-  }> {
-    const headers: Record<string, string> = {}
-
-    const active = await Account.active()
-    if (!active?.active_org_id) {
-      const baseUrl = await Config.get().then((x) => x.enterprise?.url ?? "https://opncd.ai")
-      return { headers, api: legacyApi, baseUrl }
-    }
-
-    const token = await Account.token(active.id)
-    if (!token) {
-      throw new Error("No active account token available for sharing")
-    }
-
-    headers["authorization"] = `Bearer ${token}`
-    headers["x-org-id"] = active.active_org_id
-    return { headers, api: consoleApi, baseUrl: active.url }
-  }
-
-  const disabled = process.env["OPENCODE_DISABLE_SHARE"] === "true" || process.env["OPENCODE_DISABLE_SHARE"] === "1"
-
-  export async function init() {
-    if (disabled) return
-    Bus.subscribe(Session.Event.Updated, async (evt) => {
-      const session = await Session.get(evt.properties.sessionID)
-
-      await sync(session.id, [
-        {
-          type: "session",
-          data: session,
-        },
-      ])
-    })
-    Bus.subscribe(MessageV2.Event.Updated, async (evt) => {
-      const info = evt.properties.info
-      await sync(info.sessionID, [
-        {
-          type: "message",
-          data: evt.properties.info,
-        },
-      ])
-      if (info.role === "user") {
-        await sync(info.sessionID, [
-          {
-            type: "model",
-            data: [await Provider.getModel(info.model.providerID, info.model.modelID).then((m) => m)],
-          },
-        ])
-      }
-    })
-    Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
-      await sync(evt.properties.part.sessionID, [
-        {
-          type: "part",
-          data: evt.properties.part,
-        },
-      ])
-    })
-    Bus.subscribe(Session.Event.Diff, async (evt) => {
-      await sync(evt.properties.sessionID, [
-        {
-          type: "session_diff",
-          data: evt.properties.diff,
-        },
-      ])
-    })
   }
 
-  export async function create(sessionID: SessionID) {
-    if (disabled) return { id: "", url: "", secret: "" }
-    log.info("creating share", { sessionID })
-    const req = await request()
-    const response = await fetch(`${req.baseUrl}${req.api.create}`, {
-      method: "POST",
-      headers: { ...req.headers, "Content-Type": "application/json" },
-      body: JSON.stringify({ sessionID: sessionID }),
-    })
-
-    if (!response.ok) {
-      const message = await response.text().catch(() => response.statusText)
-      throw new Error(`Failed to create share (${response.status}): ${message || response.statusText}`)
-    }
-
-    const result = (await response.json()) as { id: string; url: string; secret: string }
-
-    Database.use((db) =>
-      db
-        .insert(SessionShareTable)
-        .values({ session_id: sessionID, id: result.id, secret: result.secret, url: result.url })
-        .onConflictDoUpdate({
-          target: SessionShareTable.session_id,
-          set: { id: result.id, secret: result.secret, url: result.url },
-        })
-        .run(),
-    )
-    fullSync(sessionID)
-    return result
-  }
+  const ShareSchema = Schema.Struct({
+    id: Schema.String,
+    url: Schema.String,
+    secret: Schema.String,
+  })
+  export type Share = typeof ShareSchema.Type
 
-  function get(sessionID: SessionID) {
-    const row = Database.use((db) =>
-      db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).get(),
-    )
-    if (!row) return
-    return { id: row.id, secret: row.secret, url: row.url }
+  type State = {
+    queue: Map<string, { data: Map<string, Data> }>
+    scope: Scope.Closeable
   }
 
   type Data =
@@ -171,6 +66,31 @@ export namespace ShareNext {
         data: SDK.Model[]
       }
 
+  export interface Interface {
+    readonly init: () => Effect.Effect<void, unknown>
+    readonly url: () => Effect.Effect<string, unknown>
+    readonly request: () => Effect.Effect<Req, unknown>
+    readonly create: (sessionID: SessionID) => Effect.Effect<Share, unknown>
+    readonly remove: (sessionID: SessionID) => Effect.Effect<void, unknown>
+  }
+
+  export class Service extends ServiceMap.Service<Service, Interface>()("@opencode/ShareNext") {}
+
+  const db = <T>(fn: (d: Parameters<typeof Database.use>[0] extends (trx: infer D) => any ? D : never) => T) =>
+    Effect.sync(() => Database.use(fn))
+
+  function api(resource: string): Api {
+    return {
+      create: `/api/${resource}`,
+      sync: (shareID) => `/api/${resource}/${shareID}/sync`,
+      remove: (shareID) => `/api/${resource}/${shareID}`,
+      data: (shareID) => `/api/${resource}/${shareID}/data`,
+    }
+  }
+
+  const legacyApi = api("share")
+  const consoleApi = api("shares")
+
   function key(item: Data) {
     switch (item.type) {
       case "session":
@@ -186,102 +106,264 @@ export namespace ShareNext {
     }
   }
 
-  const queue = new Map<string, { timeout: NodeJS.Timeout; data: Map<string, Data> }>()
-  async function sync(sessionID: SessionID, data: Data[]) {
-    if (disabled) return
-    const existing = queue.get(sessionID)
-    if (existing) {
-      for (const item of data) {
-        existing.data.set(key(item), item)
+  export const layer = Layer.effect(
+    Service,
+    Effect.gen(function* () {
+      const account = yield* Account.Service
+      const bus = yield* Bus.Service
+      const cfg = yield* Config.Service
+      const http = yield* HttpClient.HttpClient
+      const httpOk = HttpClient.filterStatusOk(http)
+      const provider = yield* Provider.Service
+      const session = yield* Session.Service
+
+      function sync(sessionID: SessionID, data: Data[]): Effect.Effect<void> {
+        return Effect.gen(function* () {
+          if (disabled) return
+          const s = yield* InstanceState.get(state)
+          const existing = s.queue.get(sessionID)
+          if (existing) {
+            for (const item of data) {
+              existing.data.set(key(item), item)
+            }
+            return
+          }
+
+          const next = new Map(data.map((item) => [key(item), item]))
+          s.queue.set(sessionID, { data: next })
+          yield* flush(sessionID).pipe(
+            Effect.delay(1000),
+            Effect.catchCause((cause) =>
+              Effect.sync(() => {
+                log.error("share flush failed", { sessionID, cause })
+              }),
+            ),
+            Effect.forkIn(s.scope),
+          )
+        })
       }
-      return
-    }
 
-    const dataMap = new Map<string, Data>()
-    for (const item of data) {
-      dataMap.set(key(item), item)
-    }
+      const state: InstanceState<State> = yield* InstanceState.make<State>(
+        Effect.fn("ShareNext.state")(function* (_ctx) {
+          const cache: State = { queue: new Map(), scope: yield* Scope.make() }
+
+          yield* Effect.addFinalizer(() =>
+            Scope.close(cache.scope, Exit.void).pipe(
+              Effect.andThen(
+                Effect.sync(() => {
+                  cache.queue.clear()
+                }),
+              ),
+            ),
+          )
 
-    const timeout = setTimeout(async () => {
-      const queued = queue.get(sessionID)
-      if (!queued) return
-      queue.delete(sessionID)
-      const share = get(sessionID)
-      if (!share) return
-
-      const req = await request()
-      const response = await fetch(`${req.baseUrl}${req.api.sync(share.id)}`, {
-        method: "POST",
-        headers: { ...req.headers, "Content-Type": "application/json" },
-        body: JSON.stringify({
-          secret: share.secret,
-          data: Array.from(queued.data.values()),
+          if (disabled) return cache
+
+          const watch = <D extends { type: string }>(def: D, fn: (evt: { properties: any }) => Effect.Effect<void>) =>
+            bus.subscribe(def as never).pipe(
+              Stream.runForEach((evt) =>
+                fn(evt).pipe(
+                  Effect.catchCause((cause) =>
+                    Effect.sync(() => {
+                      log.error("share subscriber failed", { type: def.type, cause })
+                    }),
+                  ),
+                ),
+              ),
+              Effect.forkScoped,
+            )
+
+          yield* watch(Session.Event.Updated, (evt) =>
+            Effect.gen(function* () {
+              const info = yield* session.get(evt.properties.sessionID)
+              yield* sync(info.id, [{ type: "session", data: info }])
+            }),
+          )
+          yield* watch(MessageV2.Event.Updated, (evt) =>
+            Effect.gen(function* () {
+              const info = evt.properties.info
+              yield* sync(info.sessionID, [{ type: "message", data: info }])
+              if (info.role !== "user") return
+              const model = yield* provider.getModel(info.model.providerID, info.model.modelID)
+              yield* sync(info.sessionID, [{ type: "model", data: [model] }])
+            }),
+          )
+          yield* watch(MessageV2.Event.PartUpdated, (evt) =>
+            sync(evt.properties.part.sessionID, [{ type: "part", data: evt.properties.part }]),
+          )
+          yield* watch(Session.Event.Diff, (evt) =>
+            sync(evt.properties.sessionID, [{ type: "session_diff", data: evt.properties.diff }]),
+          )
+
+          return cache
         }),
+      )
+
+      const request = Effect.fn("ShareNext.request")(function* () {
+        const headers: Record<string, string> = {}
+        const active = yield* account.active()
+        if (Option.isNone(active) || !active.value.active_org_id) {
+          const baseUrl = (yield* cfg.get()).enterprise?.url ?? "https://opncd.ai"
+          return { headers, api: legacyApi, baseUrl } satisfies Req
+        }
+
+        const token = yield* account.token(active.value.id)
+        if (Option.isNone(token)) {
+          throw new Error("No active account token available for sharing")
+        }
+
+        headers.authorization = `Bearer ${token.value}`
+        headers["x-org-id"] = active.value.active_org_id
+        return { headers, api: consoleApi, baseUrl: active.value.url } satisfies Req
       })
 
-      if (!response.ok) {
-        log.warn("failed to sync share", { sessionID, shareID: share.id, status: response.status })
-      }
-    }, 1000)
-    queue.set(sessionID, { timeout, data: dataMap })
+      const get = Effect.fnUntraced(function* (sessionID: SessionID) {
+        const row = yield* db((db) =>
+          db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).get(),
+        )
+        if (!row) return
+        return { id: row.id, secret: row.secret, url: row.url } satisfies Share
+      })
+
+      const flush = Effect.fn("ShareNext.flush")(function* (sessionID: SessionID) {
+        if (disabled) return
+        const s = yield* InstanceState.get(state)
+        const queued = s.queue.get(sessionID)
+        if (!queued) return
+
+        s.queue.delete(sessionID)
+
+        const share = yield* get(sessionID)
+        if (!share) return
+
+        const req = yield* request()
+        const res = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.sync(share.id)}`).pipe(
+          HttpClientRequest.setHeaders(req.headers),
+          HttpClientRequest.bodyJson({ secret: share.secret, data: Array.from(queued.data.values()) }),
+          Effect.flatMap((r) => http.execute(r)),
+        )
+
+        if (res.status >= 400) {
+          log.warn("failed to sync share", { sessionID, shareID: share.id, status: res.status })
+        }
+      })
+
+      const full = Effect.fn("ShareNext.full")(function* (sessionID: SessionID) {
+        log.info("full sync", { sessionID })
+        const info = yield* session.get(sessionID)
+        const diffs = yield* session.diff(sessionID)
+        const messages = yield* Effect.sync(() => Array.from(MessageV2.stream(sessionID)))
+        const models = yield* Effect.forEach(
+          Array.from(
+            new Map(
+              messages
+                .filter((msg) => msg.info.role === "user")
+                .map((msg) => (msg.info as SDK.UserMessage).model)
+                .map((item) => [`${item.providerID}/${item.modelID}`, item] as const),
+            ).values(),
+          ),
+          (item) => provider.getModel(ProviderID.make(item.providerID), ModelID.make(item.modelID)),
+          { concurrency: 8 },
+        )
+
+        yield* sync(sessionID, [
+          { type: "session", data: info },
+          ...messages.map((item) => ({ type: "message" as const, data: item.info })),
+          ...messages.flatMap((item) => item.parts.map((part) => ({ type: "part" as const, data: part }))),
+          { type: "session_diff", data: diffs },
+          { type: "model", data: models },
+        ])
+      })
+
+      const init = Effect.fn("ShareNext.init")(function* () {
+        if (disabled) return
+        yield* InstanceState.get(state)
+      })
+
+      const url = Effect.fn("ShareNext.url")(function* () {
+        return (yield* request()).baseUrl
+      })
+
+      const create = Effect.fn("ShareNext.create")(function* (sessionID: SessionID) {
+        if (disabled) return { id: "", url: "", secret: "" }
+        log.info("creating share", { sessionID })
+        const req = yield* request()
+        const result = yield* HttpClientRequest.post(`${req.baseUrl}${req.api.create}`).pipe(
+          HttpClientRequest.setHeaders(req.headers),
+          HttpClientRequest.bodyJson({ sessionID }),
+          Effect.flatMap((r) => httpOk.execute(r)),
+          Effect.flatMap(HttpClientResponse.schemaBodyJson(ShareSchema)),
+        )
+        yield* db((db) =>
+          db
+            .insert(SessionShareTable)
+            .values({ session_id: sessionID, id: result.id, secret: result.secret, url: result.url })
+            .onConflictDoUpdate({
+              target: SessionShareTable.session_id,
+              set: { id: result.id, secret: result.secret, url: result.url },
+            })
+            .run(),
+        )
+        const s = yield* InstanceState.get(state)
+        yield* full(sessionID).pipe(
+          Effect.catchCause((cause) =>
+            Effect.sync(() => {
+              log.error("share full sync failed", { sessionID, cause })
+            }),
+          ),
+          Effect.forkIn(s.scope),
+        )
+        return result
+      })
+
+      const remove = Effect.fn("ShareNext.remove")(function* (sessionID: SessionID) {
+        if (disabled) return
+        log.info("removing share", { sessionID })
+        const share = yield* get(sessionID)
+        if (!share) return
+
+        const req = yield* request()
+        yield* HttpClientRequest.delete(`${req.baseUrl}${req.api.remove(share.id)}`).pipe(
+          HttpClientRequest.setHeaders(req.headers),
+          HttpClientRequest.bodyJson({ secret: share.secret }),
+          Effect.flatMap((r) => httpOk.execute(r)),
+        )
+
+        yield* db((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run())
+      })
+
+      return Service.of({ init, url, request, create, remove })
+    }),
+  )
+
+  export const defaultLayer = layer.pipe(
+    Layer.provide(Bus.layer),
+    Layer.provide(Account.defaultLayer),
+    Layer.provide(Config.defaultLayer),
+    Layer.provide(FetchHttpClient.layer),
+    Layer.provide(Provider.defaultLayer),
+    Layer.provide(Session.defaultLayer),
+  )
+
+  const { runPromise } = makeRuntime(Service, defaultLayer)
+
+  export async function init() {
+    return runPromise((svc) => svc.init())
   }
 
-  export async function remove(sessionID: SessionID) {
-    if (disabled) return
-    log.info("removing share", { sessionID })
-    const share = get(sessionID)
-    if (!share) return
-
-    const req = await request()
-    const response = await fetch(`${req.baseUrl}${req.api.remove(share.id)}`, {
-      method: "DELETE",
-      headers: { ...req.headers, "Content-Type": "application/json" },
-      body: JSON.stringify({
-        secret: share.secret,
-      }),
-    })
-
-    if (!response.ok) {
-      const message = await response.text().catch(() => response.statusText)
-      throw new Error(`Failed to remove share (${response.status}): ${message || response.statusText}`)
-    }
+  export async function url() {
+    return runPromise((svc) => svc.url())
+  }
 
-    Database.use((db) => db.delete(SessionShareTable).where(eq(SessionShareTable.session_id, sessionID)).run())
+  export async function request(): Promise<Req> {
+    return runPromise((svc) => svc.request())
   }
 
-  async function fullSync(sessionID: SessionID) {
-    log.info("full sync", { sessionID })
-    const session = await Session.get(sessionID)
-    const diffs = await Session.diff(sessionID)
-    const messages = await Array.fromAsync(MessageV2.stream(sessionID))
-    const models = await Promise.all(
-      Array.from(
-        new Map(
-          messages
-            .filter((m) => m.info.role === "user")
-            .map((m) => (m.info as SDK.UserMessage).model)
-            .map((m) => [`${m.providerID}/${m.modelID}`, m] as const),
-        ).values(),
-      ).map((m) => Provider.getModel(ProviderID.make(m.providerID), ModelID.make(m.modelID)).then((item) => item)),
-    )
-    await sync(sessionID, [
-      {
-        type: "session",
-        data: session,
-      },
-      ...messages.map((x) => ({
-        type: "message" as const,
-        data: x.info,
-      })),
-      ...messages.flatMap((x) => x.parts.map((y) => ({ type: "part" as const, data: y }))),
-      {
-        type: "session_diff",
-        data: diffs,
-      },
-      {
-        type: "model",
-        data: models,
-      },
-    ])
+  export async function create(sessionID: SessionID) {
+    return runPromise((svc) => svc.create(sessionID))
+  }
+
+  export async function remove(sessionID: SessionID) {
+    return runPromise((svc) => svc.remove(sessionID))
   }
 }

+ 327 - 70
packages/opencode/test/share/share-next.test.ts

@@ -1,76 +1,333 @@
-import { test, expect, mock } from "bun:test"
-import { ShareNext } from "../../src/share/share-next"
-import { AccessToken, Account, AccountID, OrgID } from "../../src/account"
+import { NodeFileSystem } from "@effect/platform-node"
+import { beforeEach, describe, expect } from "bun:test"
+import { Effect, Exit, Layer, Option } from "effect"
+import { HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
+
+import { AccessToken, AccountID, OrgID, RefreshToken } from "../../src/account"
+import { Account } from "../../src/account"
+import { AccountRepo } from "../../src/account/repo"
+import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
+import { Bus } from "../../src/bus"
 import { Config } from "../../src/config/config"
+import { Provider } from "../../src/provider/provider"
+import { Session } from "../../src/session"
+import type { SessionID } from "../../src/session/schema"
+import { ShareNext } from "../../src/share/share-next"
+import { SessionShareTable } from "../../src/share/share.sql"
+import { Database, eq } from "../../src/storage/db"
+import { provideTmpdirInstance } from "../fixture/fixture"
+import { resetDatabase } from "../fixture/db"
+import { testEffect } from "../lib/effect"
 
-test("ShareNext.request uses legacy share API without active org account", async () => {
-  const originalActive = Account.active
-  const originalConfigGet = Config.get
-
-  Account.active = mock(async () => undefined)
-  Config.get = mock(async () => ({ enterprise: { url: "https://legacy-share.example.com" } }))
-
-  try {
-    const req = await ShareNext.request()
-
-    expect(req.api.create).toBe("/api/share")
-    expect(req.api.sync("shr_123")).toBe("/api/share/shr_123/sync")
-    expect(req.api.remove("shr_123")).toBe("/api/share/shr_123")
-    expect(req.api.data("shr_123")).toBe("/api/share/shr_123/data")
-    expect(req.baseUrl).toBe("https://legacy-share.example.com")
-    expect(req.headers).toEqual({})
-  } finally {
-    Account.active = originalActive
-    Config.get = originalConfigGet
-  }
-})
+const env = Layer.mergeAll(
+  Session.defaultLayer,
+  AccountRepo.layer,
+  NodeFileSystem.layer,
+  CrossSpawnSpawner.defaultLayer,
+)
+const it = testEffect(env)
+
+const json = (req: Parameters<typeof HttpClientResponse.fromWeb>[0], body: unknown, status = 200) =>
+  HttpClientResponse.fromWeb(
+    req,
+    new Response(JSON.stringify(body), {
+      status,
+      headers: { "content-type": "application/json" },
+    }),
+  )
+
+const none = HttpClient.make(() => Effect.die("unexpected http call"))
+
+function live(client: HttpClient.HttpClient) {
+  const http = Layer.succeed(HttpClient.HttpClient, client)
+  return ShareNext.layer.pipe(
+    Layer.provide(Bus.layer),
+    Layer.provide(Account.layer.pipe(Layer.provide(AccountRepo.layer), Layer.provide(http))),
+    Layer.provide(Config.defaultLayer),
+    Layer.provide(http),
+    Layer.provide(Provider.defaultLayer),
+    Layer.provide(Session.defaultLayer),
+  )
+}
+
+function wired(client: HttpClient.HttpClient) {
+  const http = Layer.succeed(HttpClient.HttpClient, client)
+  return Layer.mergeAll(
+    Bus.layer,
+    ShareNext.layer,
+    Session.layer,
+    AccountRepo.layer,
+    NodeFileSystem.layer,
+    CrossSpawnSpawner.defaultLayer,
+  ).pipe(
+    Layer.provide(Bus.layer),
+    Layer.provide(Account.layer.pipe(Layer.provide(AccountRepo.layer), Layer.provide(http))),
+    Layer.provide(Config.defaultLayer),
+    Layer.provide(http),
+    Layer.provide(Provider.defaultLayer),
+  )
+}
+
+const share = (id: SessionID) =>
+  Database.use((db) => db.select().from(SessionShareTable).where(eq(SessionShareTable.session_id, id)).get())
 
-test("ShareNext.request uses org share API with auth headers when account is active", async () => {
-  const originalActive = Account.active
-  const originalToken = Account.token
-
-  Account.active = mock(async () => ({
-    id: AccountID.make("account-1"),
-    email: "[email protected]",
-    url: "https://control.example.com",
-    active_org_id: OrgID.make("org-1"),
-  }))
-  Account.token = mock(async () => AccessToken.make("st_test_token"))
-
-  try {
-    const req = await ShareNext.request()
-
-    expect(req.api.create).toBe("/api/shares")
-    expect(req.api.sync("shr_123")).toBe("/api/shares/shr_123/sync")
-    expect(req.api.remove("shr_123")).toBe("/api/shares/shr_123")
-    expect(req.api.data("shr_123")).toBe("/api/shares/shr_123/data")
-    expect(req.baseUrl).toBe("https://control.example.com")
-    expect(req.headers).toEqual({
-      authorization: "Bearer st_test_token",
-      "x-org-id": "org-1",
-    })
-  } finally {
-    Account.active = originalActive
-    Account.token = originalToken
-  }
+const seed = (url: string, org?: string) =>
+  AccountRepo.use((repo) =>
+    repo.persistAccount({
+      id: AccountID.make("account-1"),
+      email: "[email protected]",
+      url,
+      accessToken: AccessToken.make("st_test_token"),
+      refreshToken: RefreshToken.make("rt_test_token"),
+      expiry: Date.now() + 10 * 60_000,
+      orgID: org ? Option.some(OrgID.make(org)) : Option.none(),
+    }),
+  )
+
+beforeEach(async () => {
+  await resetDatabase()
 })
 
-test("ShareNext.request fails when org account has no token", async () => {
-  const originalActive = Account.active
-  const originalToken = Account.token
-
-  Account.active = mock(async () => ({
-    id: AccountID.make("account-1"),
-    email: "[email protected]",
-    url: "https://control.example.com",
-    active_org_id: OrgID.make("org-1"),
-  }))
-  Account.token = mock(async () => undefined)
-
-  try {
-    await expect(ShareNext.request()).rejects.toThrow("No active account token available for sharing")
-  } finally {
-    Account.active = originalActive
-    Account.token = originalToken
-  }
+describe("ShareNext", () => {
+  it.live("request uses legacy share API without active org account", () =>
+    provideTmpdirInstance(
+      () =>
+        ShareNext.Service.use((svc) =>
+          Effect.gen(function* () {
+            const req = yield* svc.request()
+
+            expect(req.api.create).toBe("/api/share")
+            expect(req.api.sync("shr_123")).toBe("/api/share/shr_123/sync")
+            expect(req.api.remove("shr_123")).toBe("/api/share/shr_123")
+            expect(req.api.data("shr_123")).toBe("/api/share/shr_123/data")
+            expect(req.baseUrl).toBe("https://legacy-share.example.com")
+            expect(req.headers).toEqual({})
+          }),
+        ).pipe(Effect.provide(live(none))),
+      { config: { enterprise: { url: "https://legacy-share.example.com" } } },
+    ),
+  )
+
+  it.live("request uses default URL when no enterprise config", () =>
+    provideTmpdirInstance(() =>
+      ShareNext.Service.use((svc) =>
+        Effect.gen(function* () {
+          const req = yield* svc.request()
+
+          expect(req.baseUrl).toBe("https://opncd.ai")
+          expect(req.api.create).toBe("/api/share")
+          expect(req.headers).toEqual({})
+        }),
+      ).pipe(Effect.provide(live(none))),
+    ),
+  )
+
+  it.live("request uses org share API with auth headers when account is active", () =>
+    provideTmpdirInstance(() =>
+      Effect.gen(function* () {
+        yield* seed("https://control.example.com", "org-1")
+
+        const req = yield* ShareNext.Service.use((svc) => svc.request()).pipe(Effect.provide(live(none)))
+
+        expect(req.api.create).toBe("/api/shares")
+        expect(req.api.sync("shr_123")).toBe("/api/shares/shr_123/sync")
+        expect(req.api.remove("shr_123")).toBe("/api/shares/shr_123")
+        expect(req.api.data("shr_123")).toBe("/api/shares/shr_123/data")
+        expect(req.baseUrl).toBe("https://control.example.com")
+        expect(req.headers).toEqual({
+          authorization: "Bearer st_test_token",
+          "x-org-id": "org-1",
+        })
+      }),
+    ),
+  )
+
+  it.live("create posts share, persists it, and returns the result", () =>
+    provideTmpdirInstance(
+      () =>
+        Effect.gen(function* () {
+          const session = yield* Session.Service.use((svc) => svc.create({ title: "test" }))
+          const seen: HttpClientRequest.HttpClientRequest[] = []
+          const client = HttpClient.make((req) => {
+            seen.push(req)
+            if (req.url.endsWith("/api/share")) {
+              return Effect.succeed(
+                json(req, {
+                  id: "shr_abc",
+                  url: "https://legacy-share.example.com/share/abc",
+                  secret: "sec_123",
+                }),
+              )
+            }
+            return Effect.succeed(json(req, { ok: true }))
+          })
+
+          const result = yield* ShareNext.Service.use((svc) => svc.create(session.id)).pipe(
+            Effect.provide(live(client)),
+          )
+
+          expect(result.id).toBe("shr_abc")
+          expect(result.url).toBe("https://legacy-share.example.com/share/abc")
+          expect(result.secret).toBe("sec_123")
+
+          const row = share(session.id)
+          expect(row?.id).toBe("shr_abc")
+          expect(row?.url).toBe("https://legacy-share.example.com/share/abc")
+          expect(row?.secret).toBe("sec_123")
+
+          expect(seen).toHaveLength(1)
+          expect(seen[0].method).toBe("POST")
+          expect(seen[0].url).toBe("https://legacy-share.example.com/api/share")
+        }),
+      { config: { enterprise: { url: "https://legacy-share.example.com" } } },
+    ),
+  )
+
+  it.live("remove deletes the persisted share and calls the delete endpoint", () =>
+    provideTmpdirInstance(
+      () =>
+        Effect.gen(function* () {
+          const session = yield* Session.Service.use((svc) => svc.create({ title: "test" }))
+          const seen: HttpClientRequest.HttpClientRequest[] = []
+          const client = HttpClient.make((req) => {
+            seen.push(req)
+            if (req.method === "POST") {
+              return Effect.succeed(
+                json(req, {
+                  id: "shr_abc",
+                  url: "https://legacy-share.example.com/share/abc",
+                  secret: "sec_123",
+                }),
+              )
+            }
+            return Effect.succeed(HttpClientResponse.fromWeb(req, new Response(null, { status: 200 })))
+          })
+
+          yield* Effect.gen(function* () {
+            yield* ShareNext.Service.use((svc) => svc.create(session.id))
+            yield* ShareNext.Service.use((svc) => svc.remove(session.id))
+          }).pipe(Effect.provide(live(client)))
+
+          expect(share(session.id)).toBeUndefined()
+          expect(seen.map((req) => [req.method, req.url])).toEqual([
+            ["POST", "https://legacy-share.example.com/api/share"],
+            ["DELETE", "https://legacy-share.example.com/api/share/shr_abc"],
+          ])
+        }),
+      { config: { enterprise: { url: "https://legacy-share.example.com" } } },
+    ),
+  )
+
+  it.live("create fails on a non-ok response and does not persist a share", () =>
+    provideTmpdirInstance(() =>
+      Effect.gen(function* () {
+        const session = yield* Session.Service.use((svc) => svc.create({ title: "test" }))
+        const client = HttpClient.make((req) => Effect.succeed(json(req, { error: "bad" }, 500)))
+
+        const exit = yield* ShareNext.Service.use((svc) => Effect.exit(svc.create(session.id))).pipe(
+          Effect.provide(live(client)),
+        )
+
+        expect(Exit.isFailure(exit)).toBe(true)
+        expect(share(session.id)).toBeUndefined()
+      }),
+    ),
+  )
+
+  it.live("ShareNext coalesces rapid diff events into one delayed sync with latest data", () =>
+    provideTmpdirInstance(
+      () => {
+        const seen: Array<{ url: string; body: string }> = []
+        const client = HttpClient.make((req) => {
+          if (req.url.endsWith("/sync") && req.body._tag === "Uint8Array") {
+            seen.push({ url: req.url, body: new TextDecoder().decode(req.body.body) })
+          }
+          return Effect.succeed(json(req, { ok: true }))
+        })
+
+        return Effect.gen(function* () {
+          const bus = yield* Bus.Service
+          const share = yield* ShareNext.Service
+          const session = yield* Session.Service
+
+          const info = yield* session.create({ title: "first" })
+          yield* share.init()
+          yield* Effect.sleep(50)
+          yield* Effect.sync(() =>
+            Database.use((db) =>
+              db
+                .insert(SessionShareTable)
+                .values({
+                  session_id: info.id,
+                  id: "shr_abc",
+                  url: "https://legacy-share.example.com/share/abc",
+                  secret: "sec_123",
+                })
+                .run(),
+            ),
+          )
+
+          yield* bus.publish(Session.Event.Diff, {
+            sessionID: info.id,
+            diff: [
+              {
+                file: "a.ts",
+                before: "one",
+                after: "two",
+                additions: 1,
+                deletions: 1,
+                status: "modified",
+              },
+            ],
+          })
+          yield* bus.publish(Session.Event.Diff, {
+            sessionID: info.id,
+            diff: [
+              {
+                file: "b.ts",
+                before: "old",
+                after: "new",
+                additions: 2,
+                deletions: 0,
+                status: "modified",
+              },
+            ],
+          })
+          yield* Effect.sleep(1_250)
+
+          expect(seen).toHaveLength(1)
+          expect(seen[0].url).toBe("https://legacy-share.example.com/api/share/shr_abc/sync")
+
+          const body = JSON.parse(seen[0].body) as {
+            secret: string
+            data: Array<{
+              type: string
+              data: Array<{
+                file: string
+                before: string
+                after: string
+                additions: number
+                deletions: number
+                status?: string
+              }>
+            }>
+          }
+          expect(body.secret).toBe("sec_123")
+          expect(body.data).toHaveLength(1)
+          expect(body.data[0].type).toBe("session_diff")
+          expect(body.data[0].data).toEqual([
+            {
+              file: "b.ts",
+              before: "old",
+              after: "new",
+              additions: 2,
+              deletions: 0,
+              status: "modified",
+            },
+          ])
+        }).pipe(Effect.provide(wired(client)))
+      },
+      { config: { enterprise: { url: "https://legacy-share.example.com" } } },
+    ),
+  )
 })