Browse Source

refactor(skill): effectify discovery service

Kit Langton 1 month ago
parent
commit
e21d892cff

+ 130 - 81
packages/opencode/src/skill/discovery.ts

@@ -1,98 +1,147 @@
 import path from "path"
-import { mkdir } from "fs/promises"
-import { Log } from "../util/log"
+import { Effect, Layer, Schema, ServiceMap } from "effect"
+import { FetchHttpClient, HttpClient, HttpClientRequest, HttpClientResponse } from "effect/unstable/http"
 import { Global } from "../global"
+import { Log } from "../util/log"
 import { Filesystem } from "../util/filesystem"
+import { withTransientReadRetry } from "@/util/effect-http-client"
 
-export namespace Discovery {
-  const log = Log.create({ service: "skill-discovery" })
-
-  type Index = {
-    skills: Array<{
-      name: string
-      description: string
-      files: string[]
-    }>
-  }
+class IndexSkill extends Schema.Class<IndexSkill>("IndexSkill")({
+  name: Schema.String,
+  description: Schema.String,
+  files: Schema.Array(Schema.String),
+}) {}
+
+class Index extends Schema.Class<Index>("Index")({
+  skills: Schema.Array(IndexSkill),
+}) {}
 
+export namespace Discovery {
   export function dir() {
     return path.join(Global.Path.cache, "skills")
   }
+}
 
-  async function get(url: string, dest: string): Promise<boolean> {
-    if (await Filesystem.exists(dest)) return true
-    return fetch(url)
-      .then(async (response) => {
-        if (!response.ok) {
-          log.error("failed to download", { url, status: response.status })
-          return false
-        }
-        if (response.body) await Filesystem.writeStream(dest, response.body)
-        return true
-      })
-      .catch((err) => {
-        log.error("failed to download", { url, err })
-        return false
-      })
+export namespace DiscoveryService {
+  export interface Service {
+    readonly pull: (url: string) => Effect.Effect<string[]>
   }
+}
 
-  export async function pull(url: string): Promise<string[]> {
-    const result: string[] = []
-    const base = url.endsWith("/") ? url : `${url}/`
-    const index = new URL("index.json", base).href
-    const cache = dir()
-    const host = base.slice(0, -1)
-
-    log.info("fetching index", { url: index })
-    const data = await fetch(index)
-      .then(async (response) => {
-        if (!response.ok) {
-          log.error("failed to fetch index", { url: index, status: response.status })
-          return undefined
-        }
-        return response
-          .json()
-          .then((json) => json as Index)
-          .catch((err) => {
-            log.error("failed to parse index", { url: index, err })
-            return undefined
-          })
-      })
-      .catch((err) => {
-        log.error("failed to fetch index", { url: index, err })
-        return undefined
-      })
+export class DiscoveryService extends ServiceMap.Service<DiscoveryService, DiscoveryService.Service>()(
+  "@opencode/SkillDiscovery",
+) {
+  static readonly layer = Layer.effect(
+    DiscoveryService,
+    Effect.gen(function* () {
+      const log = Log.create({ service: "skill-discovery" })
+      const http = withTransientReadRetry(yield* HttpClient.HttpClient)
+
+      const get = Effect.fn("DiscoveryService.get")((url: string, dest: string) =>
+        Effect.gen(function* () {
+          if (yield* Effect.promise(() => Filesystem.exists(dest))) return true
+
+          const req = HttpClientRequest.get(url)
+          const response = yield* http.execute(req).pipe(
+            Effect.catch((err) => {
+              log.error("failed to download", { url, err })
+              return Effect.succeed(null)
+            }),
+          )
+          if (!response) return false
+
+          const ok = yield* HttpClientResponse.filterStatusOk(response).pipe(
+            Effect.catch(() => {
+              log.error("failed to download", { url, status: response.status })
+              return Effect.succeed(null)
+            }),
+          )
+          if (!ok) return false
+
+          const body = yield* ok.arrayBuffer.pipe(
+            Effect.catch((err) => {
+              log.error("failed to read download body", { url, err })
+              return Effect.succeed(null)
+            }),
+          )
+          if (!body) return false
 
-    if (!data?.skills || !Array.isArray(data.skills)) {
-      log.warn("invalid index format", { url: index })
-      return result
-    }
-
-    const list = data.skills.filter((skill) => {
-      if (!skill?.name || !Array.isArray(skill.files)) {
-        log.warn("invalid skill entry", { url: index, skill })
-        return false
-      }
-      return true
-    })
-
-    await Promise.all(
-      list.map(async (skill) => {
-        const root = path.join(cache, skill.name)
-        await Promise.all(
-          skill.files.map(async (file) => {
-            const link = new URL(file, `${host}/${skill.name}/`).href
-            const dest = path.join(root, file)
-            await mkdir(path.dirname(dest), { recursive: true })
-            await get(link, dest)
+          yield* Effect.promise(() => Filesystem.write(dest, Buffer.from(body)))
+          return true
+        }),
+      )
+
+      const pull: DiscoveryService.Service["pull"] = Effect.fn("DiscoveryService.pull")(function* (url: string) {
+        const base = url.endsWith("/") ? url : `${url}/`
+        const index = new URL("index.json", base).href
+        const cache = Discovery.dir()
+        const host = base.slice(0, -1)
+
+        log.info("fetching index", { url: index })
+
+        const req = HttpClientRequest.get(index).pipe(HttpClientRequest.acceptJson)
+        const response = yield* http.execute(req).pipe(
+          Effect.catch((err) => {
+            log.error("failed to fetch index", { url: index, err })
+            return Effect.succeed(null)
           }),
         )
+        if (!response) return Array<string>()
 
-        const md = path.join(root, "SKILL.md")
-        if (await Filesystem.exists(md)) result.push(root)
-      }),
-    )
+        const ok = yield* HttpClientResponse.filterStatusOk(response).pipe(
+          Effect.catch(() => {
+            log.error("failed to fetch index", { url: index, status: response.status })
+            return Effect.succeed(null)
+          }),
+        )
+        if (!ok) return Array<string>()
 
-    return result
-  }
+        const data = yield* HttpClientResponse.schemaBodyJson(Index)(ok).pipe(
+          Effect.catch((err) => {
+            log.error("failed to parse index", { url: index, err })
+            return Effect.succeed(null)
+          }),
+        )
+        if (!data) {
+          log.warn("invalid index format", { url: index })
+          return Array<string>()
+        }
+
+        const list = data.skills.filter((skill) => {
+          if (!skill.name || !Array.isArray(skill.files)) {
+            log.warn("invalid skill entry", { url: index, skill })
+            return false
+          }
+          return true
+        })
+
+        const dirs = yield* Effect.all(
+          list.map((skill) =>
+            Effect.gen(function* () {
+              const root = path.join(cache, skill.name)
+
+              yield* Effect.all(
+                skill.files.map((file) => {
+                  const link = new URL(file, `${host}/${skill.name}/`).href
+                  const dest = path.join(root, file)
+                  return get(link, dest)
+                }),
+                { concurrency: "unbounded" },
+              )
+
+              const md = path.join(root, "SKILL.md")
+              return (yield* Effect.promise(() => Filesystem.exists(md))) ? root : null
+            }),
+          ),
+          { concurrency: "unbounded" },
+        )
+
+        return dirs.filter((dir): dir is string => Boolean(dir))
+      })
+
+      return DiscoveryService.of({ pull })
+    }),
+  )
+
+  static readonly defaultLayer = DiscoveryService.layer.pipe(Layer.provide(FetchHttpClient.layer))
 }

+ 4 - 3
packages/opencode/src/skill/skill.ts

@@ -10,7 +10,7 @@ import { Global } from "@/global"
 import { Filesystem } from "@/util/filesystem"
 import { Flag } from "@/flag/flag"
 import { Bus } from "@/bus"
-import { Discovery } from "./discovery"
+import { DiscoveryService } from "./discovery"
 import { Glob } from "../util/glob"
 import { pathToFileURL } from "url"
 import type { Agent } from "@/agent/agent"
@@ -106,6 +106,7 @@ export class SkillService extends ServiceMap.Service<SkillService, SkillService.
     SkillService,
     Effect.gen(function* () {
       const instance = yield* InstanceContext
+      const discovery = yield* DiscoveryService
 
       const skills: Record<string, Skill.Info> = {}
       const skillDirs = new Set<string>()
@@ -216,7 +217,7 @@ export class SkillService extends ServiceMap.Service<SkillService, SkillService.
 
           // Download and load skills from URLs
           for (const url of config.skills?.urls ?? []) {
-            const list = await Discovery.pull(url)
+            const list = await Effect.runPromise(discovery.pull(url))
             for (const dir of list) {
               skillDirs.add(dir)
               const matches = await Glob.scan(SKILL_PATTERN, {
@@ -262,5 +263,5 @@ export class SkillService extends ServiceMap.Service<SkillService, SkillService.
         }),
       })
     }),
-  )
+  ).pipe(Layer.provide(DiscoveryService.defaultLayer))
 }

+ 12 - 8
packages/opencode/test/skill/discovery.test.ts

@@ -1,5 +1,6 @@
 import { describe, test, expect, beforeAll, afterAll } from "bun:test"
-import { Discovery } from "../../src/skill/discovery"
+import { Effect } from "effect"
+import { Discovery, DiscoveryService } from "../../src/skill/discovery"
 import { Filesystem } from "../../src/util/filesystem"
 import { rm } from "fs/promises"
 import path from "path"
@@ -44,8 +45,11 @@ afterAll(async () => {
 })
 
 describe("Discovery.pull", () => {
+  const pull = (url: string) =>
+    Effect.runPromise(DiscoveryService.use((s) => s.pull(url)).pipe(Effect.provide(DiscoveryService.defaultLayer)))
+
   test("downloads skills from cloudflare url", async () => {
-    const dirs = await Discovery.pull(CLOUDFLARE_SKILLS_URL)
+    const dirs = await pull(CLOUDFLARE_SKILLS_URL)
     expect(dirs.length).toBeGreaterThan(0)
     for (const dir of dirs) {
       expect(dir).toStartWith(Discovery.dir())
@@ -55,7 +59,7 @@ describe("Discovery.pull", () => {
   })
 
   test("url without trailing slash works", async () => {
-    const dirs = await Discovery.pull(CLOUDFLARE_SKILLS_URL.replace(/\/$/, ""))
+    const dirs = await pull(CLOUDFLARE_SKILLS_URL.replace(/\/$/, ""))
     expect(dirs.length).toBeGreaterThan(0)
     for (const dir of dirs) {
       const md = path.join(dir, "SKILL.md")
@@ -64,18 +68,18 @@ describe("Discovery.pull", () => {
   })
 
   test("returns empty array for invalid url", async () => {
-    const dirs = await Discovery.pull(`http://localhost:${server.port}/invalid-url/`)
+    const dirs = await pull(`http://localhost:${server.port}/invalid-url/`)
     expect(dirs).toEqual([])
   })
 
   test("returns empty array for non-json response", async () => {
     // any url not explicitly handled in server returns 404 text "Not Found"
-    const dirs = await Discovery.pull(`http://localhost:${server.port}/some-other-path/`)
+    const dirs = await pull(`http://localhost:${server.port}/some-other-path/`)
     expect(dirs).toEqual([])
   })
 
   test("downloads reference files alongside SKILL.md", async () => {
-    const dirs = await Discovery.pull(CLOUDFLARE_SKILLS_URL)
+    const dirs = await pull(CLOUDFLARE_SKILLS_URL)
     // find a skill dir that should have reference files (e.g. agents-sdk)
     const agentsSdk = dirs.find((d) => d.endsWith(path.sep + "agents-sdk"))
     expect(agentsSdk).toBeDefined()
@@ -94,13 +98,13 @@ describe("Discovery.pull", () => {
     downloadCount = 0
 
     // first pull to populate cache
-    const first = await Discovery.pull(CLOUDFLARE_SKILLS_URL)
+    const first = await pull(CLOUDFLARE_SKILLS_URL)
     expect(first.length).toBeGreaterThan(0)
     const firstCount = downloadCount
     expect(firstCount).toBeGreaterThan(0)
 
     // second pull should return same results from cache
-    const second = await Discovery.pull(CLOUDFLARE_SKILLS_URL)
+    const second = await pull(CLOUDFLARE_SKILLS_URL)
     expect(second.length).toBe(first.length)
     expect(second.sort()).toEqual(first.sort())