|
|
@@ -45,7 +45,7 @@ import { decodeDataUrl } from "@/util/data-url"
|
|
|
import { Process } from "@/util/process"
|
|
|
import { Cause, Effect, Exit, Layer, Option, Scope, ServiceMap } from "effect"
|
|
|
import { InstanceState } from "@/effect/instance-state"
|
|
|
-import { makeRuntime } from "@/effect/run-service"
|
|
|
+import { attach, makeRuntime } from "@/effect/run-service"
|
|
|
import { TaskTool } from "@/tool/task"
|
|
|
import { SessionRunState } from "./run-state"
|
|
|
|
|
|
@@ -62,6 +62,64 @@ IMPORTANT:
|
|
|
|
|
|
const STRUCTURED_OUTPUT_SYSTEM_PROMPT = `IMPORTANT: The user has requested structured output. You MUST use the StructuredOutput tool to provide your final response. Do NOT respond with plain text - you MUST call the StructuredOutput tool with your answer formatted according to the schema.`
|
|
|
|
|
|
+/**
|
|
|
+ * Bridges an AI SDK Promise-based `execute` callback to Effect with graceful
|
|
|
+ * cancel semantics.
|
|
|
+ *
|
|
|
+ * On the happy path: runs `before`, awaits `execute()`, then `finalize(result)`
|
|
|
+ * and returns the output.
|
|
|
+ *
|
|
|
+ * On cancel mid-flight: the `onInterrupt` finalizer re-awaits the same in-flight
|
|
|
+ * native Promise uninterruptibly, runs `finalize` again on the eventual result,
|
|
|
+ * and posts it via `onCancel` (the processor side channel). This is what lets
|
|
|
+ * cancelled bash surface its truncated output through the normal completion
|
|
|
+ * path instead of getting stamped as aborted by processor cleanup.
|
|
|
+ *
|
|
|
+ * The returned Promise always resolves with a finalized output when one is
|
|
|
+ * available (even on interrupt), so the SDK reports the tool as successfully
|
|
|
+ * completed rather than as a tool-error.
|
|
|
+ *
|
|
|
+ * `attach` captures the current Instance context via InstanceRef so the
|
|
|
+ * onInterrupt finalizer — which runs outside the AsyncLocalStorage chain
|
|
|
+ * `execute()` is called from — can still resolve it through the ServiceMap.
|
|
|
+ */
|
|
|
+function runToolExecute<Raw, Output>(options: {
|
|
|
+ signal: AbortSignal | undefined
|
|
|
+ before: Effect.Effect<unknown, any, any>
|
|
|
+ execute: () => Promise<Raw>
|
|
|
+ finalize: (result: Raw) => Effect.Effect<Output, any, any>
|
|
|
+ onCancel: (output: Output) => Effect.Effect<unknown, any, any>
|
|
|
+}): Promise<Output> {
|
|
|
+ let pending: Promise<Raw> | undefined
|
|
|
+ let rescued: Output | undefined
|
|
|
+ const wait = Effect.promise(() => pending!)
|
|
|
+
|
|
|
+ const program = Effect.gen(function* () {
|
|
|
+ yield* options.before
|
|
|
+ pending = options.execute()
|
|
|
+ const result = yield* wait
|
|
|
+ return yield* options.finalize(result)
|
|
|
+ }).pipe(
|
|
|
+ // On interrupt, re-await the in-flight Promise uninterruptibly (finalizers
|
|
|
+ // always are), finalize it the same way, and post through the side channel.
|
|
|
+ // Stash the output so catchCause below can surface it instead of the cause.
|
|
|
+ Effect.onInterrupt(() =>
|
|
|
+ Effect.gen(function* () {
|
|
|
+ if (pending === undefined) return
|
|
|
+ const result = yield* wait
|
|
|
+ const output = yield* options.finalize(result)
|
|
|
+ rescued = output
|
|
|
+ yield* options.onCancel(output)
|
|
|
+ }).pipe(Effect.catchCause(() => Effect.void)),
|
|
|
+ ),
|
|
|
+ Effect.catchCause((cause) =>
|
|
|
+ Effect.suspend(() => (rescued !== undefined ? Effect.succeed(rescued!) : Effect.failCause(cause))),
|
|
|
+ ),
|
|
|
+ )
|
|
|
+
|
|
|
+ return Effect.runPromise(attach(program as Effect.Effect<Output>), { signal: options.signal })
|
|
|
+}
|
|
|
+
|
|
|
export namespace SessionPrompt {
|
|
|
const log = Log.create({ service: "session.prompt" })
|
|
|
|
|
|
@@ -396,35 +454,28 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
|
|
description: item.description,
|
|
|
inputSchema: jsonSchema(schema as any),
|
|
|
execute(args, options) {
|
|
|
- return Effect.runPromise(
|
|
|
- Effect.gen(function* () {
|
|
|
- const ctx = context(args, options)
|
|
|
- yield* plugin.trigger(
|
|
|
- "tool.execute.before",
|
|
|
- { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID },
|
|
|
- { args },
|
|
|
- )
|
|
|
- const result = yield* Effect.promise(() => item.execute(args, ctx))
|
|
|
- const output = {
|
|
|
- ...result,
|
|
|
- attachments: result.attachments?.map((attachment) => ({
|
|
|
- ...attachment,
|
|
|
- id: PartID.ascending(),
|
|
|
- sessionID: ctx.sessionID,
|
|
|
- messageID: input.processor.message.id,
|
|
|
- })),
|
|
|
- }
|
|
|
- yield* plugin.trigger(
|
|
|
- "tool.execute.after",
|
|
|
- { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID, args },
|
|
|
- output,
|
|
|
- )
|
|
|
- if (options.abortSignal?.aborted) {
|
|
|
- yield* input.processor.completeToolCall(options.toolCallId, output)
|
|
|
- }
|
|
|
- return output
|
|
|
- }),
|
|
|
- )
|
|
|
+ const ctx = context(args, options)
|
|
|
+ const meta = { tool: item.id, sessionID: ctx.sessionID, callID: ctx.callID }
|
|
|
+ return runToolExecute({
|
|
|
+ signal: options.abortSignal,
|
|
|
+ before: plugin.trigger("tool.execute.before", meta, { args }),
|
|
|
+ execute: () => item.execute(args, ctx),
|
|
|
+ finalize: (result) =>
|
|
|
+ Effect.gen(function* () {
|
|
|
+ const output = {
|
|
|
+ ...result,
|
|
|
+ attachments: result.attachments?.map((attachment) => ({
|
|
|
+ ...attachment,
|
|
|
+ id: PartID.ascending(),
|
|
|
+ sessionID: ctx.sessionID,
|
|
|
+ messageID: input.processor.message.id,
|
|
|
+ })),
|
|
|
+ }
|
|
|
+ yield* plugin.trigger("tool.execute.after", { ...meta, args }, output)
|
|
|
+ return output
|
|
|
+ }),
|
|
|
+ onCancel: (output) => input.processor.completeToolCall(options.toolCallId, output),
|
|
|
+ })
|
|
|
},
|
|
|
})
|
|
|
}
|
|
|
@@ -436,74 +487,64 @@ NOTE: At any point in time through this workflow you should feel free to ask the
|
|
|
const schema = yield* Effect.promise(() => Promise.resolve(asSchema(item.inputSchema).jsonSchema))
|
|
|
const transformed = ProviderTransform.schema(input.model, schema)
|
|
|
item.inputSchema = jsonSchema(transformed)
|
|
|
- item.execute = (args, opts) =>
|
|
|
- Effect.runPromise(
|
|
|
- Effect.gen(function* () {
|
|
|
- const ctx = context(args, opts)
|
|
|
- yield* plugin.trigger(
|
|
|
- "tool.execute.before",
|
|
|
- { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId },
|
|
|
- { args },
|
|
|
- )
|
|
|
+ item.execute = (args, opts) => {
|
|
|
+ const ctx = context(args, opts)
|
|
|
+ const meta = { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId }
|
|
|
+ type Raw = Awaited<ReturnType<NonNullable<typeof execute>>>
|
|
|
+ return runToolExecute({
|
|
|
+ signal: opts.abortSignal,
|
|
|
+ before: Effect.gen(function* () {
|
|
|
+ yield* plugin.trigger("tool.execute.before", meta, { args })
|
|
|
yield* Effect.promise(() => ctx.ask({ permission: key, metadata: {}, patterns: ["*"], always: ["*"] }))
|
|
|
- const result: Awaited<ReturnType<NonNullable<typeof execute>>> = yield* Effect.promise(() =>
|
|
|
- execute(args, opts),
|
|
|
- )
|
|
|
- yield* plugin.trigger(
|
|
|
- "tool.execute.after",
|
|
|
- { tool: key, sessionID: ctx.sessionID, callID: opts.toolCallId, args },
|
|
|
- result,
|
|
|
- )
|
|
|
-
|
|
|
- const textParts: string[] = []
|
|
|
- const attachments: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[] = []
|
|
|
- for (const contentItem of result.content) {
|
|
|
- if (contentItem.type === "text") textParts.push(contentItem.text)
|
|
|
- else if (contentItem.type === "image") {
|
|
|
- attachments.push({
|
|
|
- type: "file",
|
|
|
- mime: contentItem.mimeType,
|
|
|
- url: `data:${contentItem.mimeType};base64,${contentItem.data}`,
|
|
|
- })
|
|
|
- } else if (contentItem.type === "resource") {
|
|
|
- const { resource } = contentItem
|
|
|
- if (resource.text) textParts.push(resource.text)
|
|
|
- if (resource.blob) {
|
|
|
+ }),
|
|
|
+ execute: (): Promise<Raw> => execute(args, opts),
|
|
|
+ finalize: (result) =>
|
|
|
+ Effect.gen(function* () {
|
|
|
+ yield* plugin.trigger("tool.execute.after", { ...meta, args }, result)
|
|
|
+ const textParts: string[] = []
|
|
|
+ const attachments: Omit<MessageV2.FilePart, "id" | "sessionID" | "messageID">[] = []
|
|
|
+ for (const contentItem of result.content) {
|
|
|
+ if (contentItem.type === "text") textParts.push(contentItem.text)
|
|
|
+ else if (contentItem.type === "image") {
|
|
|
attachments.push({
|
|
|
type: "file",
|
|
|
- mime: resource.mimeType ?? "application/octet-stream",
|
|
|
- url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`,
|
|
|
- filename: resource.uri,
|
|
|
+ mime: contentItem.mimeType,
|
|
|
+ url: `data:${contentItem.mimeType};base64,${contentItem.data}`,
|
|
|
})
|
|
|
+ } else if (contentItem.type === "resource") {
|
|
|
+ const { resource } = contentItem
|
|
|
+ if (resource.text) textParts.push(resource.text)
|
|
|
+ if (resource.blob) {
|
|
|
+ attachments.push({
|
|
|
+ type: "file",
|
|
|
+ mime: resource.mimeType ?? "application/octet-stream",
|
|
|
+ url: `data:${resource.mimeType ?? "application/octet-stream"};base64,${resource.blob}`,
|
|
|
+ filename: resource.uri,
|
|
|
+ })
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
- }
|
|
|
-
|
|
|
- const truncated = yield* truncate.output(textParts.join("\n\n"), {}, input.agent)
|
|
|
- const metadata = {
|
|
|
- ...(result.metadata ?? {}),
|
|
|
- truncated: truncated.truncated,
|
|
|
- ...(truncated.truncated && { outputPath: truncated.outputPath }),
|
|
|
- }
|
|
|
-
|
|
|
- const output = {
|
|
|
- title: "",
|
|
|
- metadata,
|
|
|
- output: truncated.content,
|
|
|
- attachments: attachments.map((attachment) => ({
|
|
|
- ...attachment,
|
|
|
- id: PartID.ascending(),
|
|
|
- sessionID: ctx.sessionID,
|
|
|
- messageID: input.processor.message.id,
|
|
|
- })),
|
|
|
- content: result.content,
|
|
|
- }
|
|
|
- if (opts.abortSignal?.aborted) {
|
|
|
- yield* input.processor.completeToolCall(opts.toolCallId, output)
|
|
|
- }
|
|
|
- return output
|
|
|
- }),
|
|
|
- )
|
|
|
+ const truncated = yield* truncate.output(textParts.join("\n\n"), {}, input.agent)
|
|
|
+ return {
|
|
|
+ title: "",
|
|
|
+ metadata: {
|
|
|
+ ...(result.metadata ?? {}),
|
|
|
+ truncated: truncated.truncated,
|
|
|
+ ...(truncated.truncated && { outputPath: truncated.outputPath }),
|
|
|
+ },
|
|
|
+ output: truncated.content,
|
|
|
+ attachments: attachments.map((attachment) => ({
|
|
|
+ ...attachment,
|
|
|
+ id: PartID.ascending(),
|
|
|
+ sessionID: ctx.sessionID,
|
|
|
+ messageID: input.processor.message.id,
|
|
|
+ })),
|
|
|
+ content: result.content,
|
|
|
+ }
|
|
|
+ }),
|
|
|
+ onCancel: (output) => input.processor.completeToolCall(opts.toolCallId, output),
|
|
|
+ })
|
|
|
+ }
|
|
|
tools[key] = item
|
|
|
}
|
|
|
|