Ver Fonte

tweak: improve motel observability for session flows

Kit Langton há 1 semana atrás
pai
commit
53136cf57f

+ 1 - 1
packages/opencode/src/effect/logger.ts

@@ -55,7 +55,7 @@ export namespace EffectLogger {
     }
     }
   })
   })
 
 
-  export const layer = Logger.layer([logger], { mergeWithExisting: false })
+  export const layer = Logger.layer([Logger.tracerLogger, logger], { mergeWithExisting: false })
 
 
   export const create = (base: Fields = {}): Handle => ({
   export const create = (base: Fields = {}): Handle => ({
     debug: (msg, extra) => call((item) => Effect.logDebug(item), base, msg, extra),
     debug: (msg, extra) => call((item) => Effect.logDebug(item), base, msg, extra),

+ 2 - 1
packages/opencode/src/effect/oltp.ts

@@ -6,7 +6,7 @@ import { Flag } from "@/flag/flag"
 import { CHANNEL, VERSION } from "@/installation/meta"
 import { CHANNEL, VERSION } from "@/installation/meta"
 
 
 export namespace Observability {
 export namespace Observability {
-  const base = Flag.OTEL_EXPORTER_OTLP_ENDPOINT
+  const base = Flag.OTEL_EXPORTER_OTLP_ENDPOINT ?? (CHANNEL === "local" ? "http://127.0.0.1:27686" : undefined)
   export const enabled = !!base
   export const enabled = !!base
 
 
   const resource = {
   const resource = {
@@ -34,6 +34,7 @@ export namespace Observability {
     : Otlp.layerJson({
     : Otlp.layerJson({
         baseUrl: base,
         baseUrl: base,
         loggerExportInterval: Duration.seconds(1),
         loggerExportInterval: Duration.seconds(1),
+        tracerExportInterval: Duration.seconds(1),
         loggerMergeWithExisting: true,
         loggerMergeWithExisting: true,
         resource,
         resource,
         headers,
         headers,

+ 44 - 3
packages/opencode/src/session/processor.ts

@@ -123,6 +123,14 @@ export namespace SessionProcessor {
         let aborted = false
         let aborted = false
         const slog = log.with({ sessionID: input.sessionID, messageID: input.assistantMessage.id })
         const slog = log.with({ sessionID: input.sessionID, messageID: input.assistantMessage.id })
 
 
+        yield* Effect.annotateCurrentSpan({
+          sessionID: input.sessionID,
+          messageID: input.assistantMessage.id,
+          agent: input.assistantMessage.agent,
+          providerID: input.model.providerID,
+          modelID: input.model.id,
+        })
+
         const parse = (e: unknown) =>
         const parse = (e: unknown) =>
           MessageV2.fromError(e, {
           MessageV2.fromError(e, {
             providerID: input.model.providerID,
             providerID: input.model.providerID,
@@ -531,7 +539,18 @@ export namespace SessionProcessor {
         })
         })
 
 
         const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
         const process = Effect.fn("SessionProcessor.process")(function* (streamInput: LLM.StreamInput) {
-          yield* slog.info("process")
+          yield* Effect.annotateCurrentSpan({
+            sessionID: ctx.sessionID,
+            messageID: ctx.assistantMessage.id,
+            agent: ctx.assistantMessage.agent,
+            providerID: ctx.model.providerID,
+            modelID: ctx.model.id,
+          })
+          yield* slog.info("process", {
+            agent: ctx.assistantMessage.agent,
+            providerID: ctx.model.providerID,
+            modelID: ctx.model.id,
+          })
           ctx.needsCompaction = false
           ctx.needsCompaction = false
           ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
           ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true
 
 
@@ -545,6 +564,17 @@ export namespace SessionProcessor {
                 Stream.tap((event) => handleEvent(event)),
                 Stream.tap((event) => handleEvent(event)),
                 Stream.takeUntil(() => ctx.needsCompaction),
                 Stream.takeUntil(() => ctx.needsCompaction),
                 Stream.runDrain,
                 Stream.runDrain,
+                Effect.withSpan(
+                  "SessionProcessor.stream",
+                  {
+                    attributes: {
+                      sessionID: ctx.sessionID,
+                      messageID: ctx.assistantMessage.id,
+                      agent: ctx.assistantMessage.agent,
+                    },
+                  },
+                  { captureStackTrace: false },
+                ),
               )
               )
             }).pipe(
             }).pipe(
               Effect.onInterrupt(() =>
               Effect.onInterrupt(() =>
@@ -575,8 +605,19 @@ export namespace SessionProcessor {
               Effect.ensuring(cleanup()),
               Effect.ensuring(cleanup()),
             )
             )
 
 
-            if (ctx.needsCompaction) return "compact"
-            if (ctx.blocked || ctx.assistantMessage.error) return "stop"
+            if (ctx.needsCompaction) {
+              yield* slog.warn("compact", { finish: ctx.assistantMessage.finish, blocked: ctx.blocked })
+              return "compact"
+            }
+            if (ctx.blocked || ctx.assistantMessage.error) {
+              yield* slog.warn("stop", {
+                blocked: ctx.blocked,
+                finish: ctx.assistantMessage.finish,
+                hasError: !!ctx.assistantMessage.error,
+              })
+              return "stop"
+            }
+            yield* slog.info("continue", { finish: ctx.assistantMessage.finish })
             return "continue"
             return "continue"
           })
           })
         })
         })

+ 88 - 3
packages/opencode/src/session/prompt.ts

@@ -398,6 +398,17 @@ NOTE: At any point in time through this workflow you should feel free to ask the
               return Effect.runPromise(
               return Effect.runPromise(
                 Effect.gen(function* () {
                 Effect.gen(function* () {
                   const ctx = context(args, options)
                   const ctx = context(args, options)
+                  yield* Effect.annotateCurrentSpan({
+                    tool: item.id,
+                    sessionID: ctx.sessionID,
+                    messageID: input.processor.message.id,
+                    callID: ctx.callID,
+                  })
+                  yield* elog.info("tool.start", {
+                    tool: item.id,
+                    sessionID: ctx.sessionID,
+                    callID: ctx.callID,
+                  })
                   yield* plugin.trigger(
                   yield* plugin.trigger(
                     "tool.execute.before",
                     "tool.execute.before",
                     { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID },
                     { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID },
@@ -421,8 +432,27 @@ NOTE: At any point in time through this workflow you should feel free to ask the
                   if (options.abortSignal?.aborted) {
                   if (options.abortSignal?.aborted) {
                     yield* input.processor.completeToolCall(options.toolCallId, output)
                     yield* input.processor.completeToolCall(options.toolCallId, output)
                   }
                   }
+                  yield* elog.info("tool.done", {
+                    tool: item.id,
+                    sessionID: ctx.sessionID,
+                    callID: ctx.callID,
+                    truncated: output.metadata.truncated,
+                  })
                   return output
                   return output
-                }),
+                }).pipe(
+                  Effect.withSpan(
+                    `Tool.${item.id}`,
+                    {
+                      attributes: {
+                        tool: item.id,
+                        sessionID: input.session.id,
+                        messageID: input.processor.message.id,
+                        callID: options.toolCallId,
+                      },
+                    },
+                    { captureStackTrace: false },
+                  ),
+                ),
               )
               )
             },
             },
           })
           })
@@ -439,6 +469,13 @@ NOTE: At any point in time through this workflow you should feel free to ask the
             Effect.runPromise(
             Effect.runPromise(
               Effect.gen(function* () {
               Effect.gen(function* () {
                 const ctx = context(args, opts)
                 const ctx = context(args, opts)
+                yield* Effect.annotateCurrentSpan({
+                  tool: key,
+                  sessionID: ctx.sessionID,
+                  messageID: input.processor.message.id,
+                  callID: ctx.callID,
+                })
+                yield* elog.info("tool.start", { tool: key, sessionID: ctx.sessionID, callID: ctx.callID })
                 yield* plugin.trigger(
                 yield* plugin.trigger(
                   "tool.execute.before",
                   "tool.execute.before",
                   { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId },
                   { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId },
@@ -500,8 +537,27 @@ NOTE: At any point in time through this workflow you should feel free to ask the
                 if (opts.abortSignal?.aborted) {
                 if (opts.abortSignal?.aborted) {
                   yield* input.processor.completeToolCall(opts.toolCallId, output)
                   yield* input.processor.completeToolCall(opts.toolCallId, output)
                 }
                 }
+                yield* elog.info("tool.done", {
+                  tool: key,
+                  sessionID: ctx.sessionID,
+                  callID: ctx.callID,
+                  truncated: output.metadata.truncated,
+                })
                 return output
                 return output
-              }),
+              }).pipe(
+                Effect.withSpan(
+                  `Tool.${key}`,
+                  {
+                    attributes: {
+                      tool: key,
+                      sessionID: input.session.id,
+                      messageID: input.processor.message.id,
+                      callID: opts.toolCallId,
+                    },
+                  },
+                  { captureStackTrace: false },
+                ),
+              ),
             )
             )
           tools[key] = item
           tools[key] = item
         }
         }
@@ -1327,6 +1383,14 @@ NOTE: At any point in time through this workflow you should feel free to ask the
 
 
             if (!lastUser) throw new Error("No user message found in stream. This should never happen.")
             if (!lastUser) throw new Error("No user message found in stream. This should never happen.")
 
 
+            yield* Effect.annotateCurrentSpan({
+              sessionID,
+              step,
+              agent: lastUser.agent,
+              providerID: lastUser.model.providerID,
+              modelID: lastUser.model.modelID,
+            })
+
             const lastAssistantMsg = msgs.findLast(
             const lastAssistantMsg = msgs.findLast(
               (msg) => msg.info.role === "assistant" && msg.info.id === lastAssistant?.id,
               (msg) => msg.info.role === "assistant" && msg.info.id === lastAssistant?.id,
             )
             )
@@ -1348,6 +1412,12 @@ NOTE: At any point in time through this workflow you should feel free to ask the
             }
             }
 
 
             step++
             step++
+            yield* slog.info("step", {
+              step,
+              agent: lastUser.agent,
+              providerID: lastUser.model.providerID,
+              modelID: lastUser.model.modelID,
+            })
             if (step === 1)
             if (step === 1)
               yield* title({
               yield* title({
                 session,
                 session,
@@ -1365,6 +1435,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the
             }
             }
 
 
             if (task?.type === "compaction") {
             if (task?.type === "compaction") {
+              yield* slog.warn("compaction", { step, auto: task.auto, overflow: task.overflow })
               const result = yield* compaction.process({
               const result = yield* compaction.process({
                 messages: msgs,
                 messages: msgs,
                 parentID: lastUser.id,
                 parentID: lastUser.id,
@@ -1469,7 +1540,21 @@ NOTE: At any point in time through this workflow you should feel free to ask the
                 Effect.promise(() => SystemPrompt.environment(model)),
                 Effect.promise(() => SystemPrompt.environment(model)),
                 instruction.system().pipe(Effect.orDie),
                 instruction.system().pipe(Effect.orDie),
                 MessageV2.toModelMessagesEffect(msgs, model),
                 MessageV2.toModelMessagesEffect(msgs, model),
-              ])
+              ]).pipe(
+                Effect.withSpan(
+                  "SessionPrompt.prepareInput",
+                  {
+                    attributes: {
+                      sessionID,
+                      step,
+                      agent: agent.name,
+                      providerID: model.providerID,
+                      modelID: model.id,
+                    },
+                  },
+                  { captureStackTrace: false },
+                ),
+              )
               const system = [...env, ...(skills ? [skills] : []), ...instructions]
               const system = [...env, ...(skills ? [skills] : []), ...instructions]
               const format = lastUser.format ?? { type: "text" as const }
               const format = lastUser.format ?? { type: "text" as const }
               if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)
               if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT)

+ 2 - 1
packages/opencode/src/tool/write.ts

@@ -24,6 +24,7 @@ export const WriteTool = Tool.defineEffect(
     const lsp = yield* LSP.Service
     const lsp = yield* LSP.Service
     const fs = yield* AppFileSystem.Service
     const fs = yield* AppFileSystem.Service
     const filetime = yield* FileTime.Service
     const filetime = yield* FileTime.Service
+    const format = yield* Format.Service
 
 
     return {
     return {
       description: DESCRIPTION,
       description: DESCRIPTION,
@@ -56,7 +57,7 @@ export const WriteTool = Tool.defineEffect(
           )
           )
 
 
           yield* fs.writeWithDirs(filepath, params.content)
           yield* fs.writeWithDirs(filepath, params.content)
-          yield* Effect.promise(() => Format.file(filepath))
+          yield* format.file(filepath)
           Bus.publish(File.Event.Edited, { file: filepath })
           Bus.publish(File.Event.Edited, { file: filepath })
           yield* Effect.promise(() =>
           yield* Effect.promise(() =>
             Bus.publish(FileWatcher.Event.Updated, {
             Bus.publish(FileWatcher.Event.Updated, {

+ 15 - 1
packages/opencode/test/tool/write.test.ts

@@ -7,6 +7,7 @@ import { Instance } from "../../src/project/instance"
 import { LSP } from "../../src/lsp"
 import { LSP } from "../../src/lsp"
 import { AppFileSystem } from "../../src/filesystem"
 import { AppFileSystem } from "../../src/filesystem"
 import { FileTime } from "../../src/file/time"
 import { FileTime } from "../../src/file/time"
+import { Format } from "../../src/format"
 import { Tool } from "../../src/tool/tool"
 import { Tool } from "../../src/tool/tool"
 import { SessionID, MessageID } from "../../src/session/schema"
 import { SessionID, MessageID } from "../../src/session/schema"
 import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
 import * as CrossSpawnSpawner from "../../src/effect/cross-spawn-spawner"
@@ -29,7 +30,20 @@ afterEach(async () => {
 })
 })
 
 
 const it = testEffect(
 const it = testEffect(
-  Layer.mergeAll(LSP.defaultLayer, AppFileSystem.defaultLayer, FileTime.defaultLayer, CrossSpawnSpawner.defaultLayer),
+  Layer.mergeAll(
+    LSP.defaultLayer,
+    AppFileSystem.defaultLayer,
+    FileTime.defaultLayer,
+    CrossSpawnSpawner.defaultLayer,
+    Layer.succeed(
+      Format.Service,
+      Format.Service.of({
+        init: () => Effect.void,
+        status: () => Effect.succeed([]),
+        file: () => Effect.void,
+      }),
+    ),
+  ),
 )
 )
 
 
 const init = Effect.fn("WriteToolTest.init")(function* () {
 const init = Effect.fn("WriteToolTest.init")(function* () {