Browse Source

fix(observability): move Langfuse trace from proxy-handler to response-handler for complete data

The previous Langfuse integration fired traces in proxy-handler.ts
immediately after dispatch() returned. For streaming responses, the
response body hadn't been consumed yet, so responseText, usageMetrics,
costUsd, sseEventCount, and errorMessage were all undefined.

Move trace invocation into response-handler.ts at the exact points
where ALL data has been collected:
- Non-streaming standard path (after response body parsed + cost calculated)
- Non-streaming Gemini passthrough (after finalizeRequestStats)
- Streaming standard path (end of finalizeStream)
- Streaming Gemini passthrough (after finalizeRequestStats)
- Error/abort paths (inside persistRequestFailure)

Also enhance the trace quality:
- Generation input: actual request messages instead of summary object
- Generation output: actual parsed response body instead of empty/status
- Tags: add provider.name and model alongside providerType
- Metadata: add model, originalModel, endpoint, sessionId, keyName,
  requestSequence, sseEventCount, and requestSummary
- Root span output: include costUsd
- Add truncateForLangfuse() helper (configurable via LANGFUSE_MAX_IO_SIZE)
- TraceContext: replace response:Response with responseHeaders:Headers
- Import UsageMetrics type from response-handler
ding113 2 weeks ago
parent
commit
3251c2fe9a

+ 0 - 22
src/app/v1/_lib/proxy-handler.ts

@@ -82,28 +82,6 @@ export async function handleProxyRequest(c: Context): Promise<Response> {
     const handled = await ProxyResponseHandler.dispatch(session, response);
     const finalResponse = await attachSessionIdToErrorResponse(session.sessionId, handled);
 
-    // Fire Langfuse trace asynchronously (non-blocking)
-    if (process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY && session) {
-      const traceSession = session;
-      const traceStatusCode = handled.status;
-      const isSSE = (handled.headers.get("content-type") || "").includes("text/event-stream");
-      void import("@/lib/langfuse/trace-proxy-request")
-        .then(({ traceProxyRequest }) => {
-          void traceProxyRequest({
-            session: traceSession,
-            response: handled,
-            durationMs: Date.now() - traceSession.startTime,
-            statusCode: traceStatusCode,
-            isStreaming: isSSE,
-          });
-        })
-        .catch((err) => {
-          logger.warn("[ProxyHandler] Langfuse trace failed", {
-            error: err instanceof Error ? err.message : String(err),
-          });
-        });
-    }
-
     return finalResponse;
   } catch (error) {
     logger.error("Proxy handler error:", error);

+ 112 - 8
src/app/v1/_lib/proxy/response-handler.ts

@@ -39,6 +39,47 @@ export type UsageMetrics = {
   output_image_tokens?: number;
 };
 
+/**
+ * Fire Langfuse trace asynchronously. Non-blocking, error-tolerant.
+ */
+function emitLangfuseTrace(
+  session: ProxySession,
+  data: {
+    responseHeaders: Headers;
+    responseText: string;
+    usageMetrics: UsageMetrics | null;
+    costUsd: string | undefined;
+    statusCode: number;
+    durationMs: number;
+    isStreaming: boolean;
+    sseEventCount?: number;
+    errorMessage?: string;
+  }
+): void {
+  if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return;
+
+  void import("@/lib/langfuse/trace-proxy-request")
+    .then(({ traceProxyRequest }) => {
+      void traceProxyRequest({
+        session,
+        responseHeaders: data.responseHeaders,
+        durationMs: data.durationMs,
+        statusCode: data.statusCode,
+        isStreaming: data.isStreaming,
+        responseText: data.responseText,
+        usageMetrics: data.usageMetrics,
+        costUsd: data.costUsd,
+        sseEventCount: data.sseEventCount,
+        errorMessage: data.errorMessage,
+      });
+    })
+    .catch((err) => {
+      logger.warn("[ResponseHandler] Langfuse trace failed", {
+        error: err instanceof Error ? err.message : String(err),
+      });
+    });
+}
+
 /**
  * 清理 Response headers 中的传输相关 header
  *
@@ -520,6 +561,18 @@ export class ProxyResponseHandler {
               duration,
               errorMessageForFinalize
             );
+
+            emitLangfuseTrace(session, {
+              responseHeaders: response.headers,
+              responseText,
+              usageMetrics: parseUsageFromResponseText(responseText, provider.providerType)
+                .usageMetrics,
+              costUsd: undefined,
+              statusCode,
+              durationMs: duration,
+              isStreaming: false,
+              errorMessage: errorMessageForFinalize,
+            });
           } catch (error) {
             if (!isClientAbortError(error as Error)) {
               logger.error(
@@ -687,10 +740,9 @@ export class ProxyResponseHandler {
           await trackCostToRedis(session, usageMetrics);
         }
 
-        // 更新 session 使用量到 Redis(用于实时监控)
-        if (session.sessionId && usageMetrics) {
-          // 计算成本(复用相同逻辑)
-          let costUsdStr: string | undefined;
+        // Calculate cost (for session tracking and Langfuse tracing)
+        let costUsdStr: string | undefined;
+        if (usageMetrics) {
           try {
             if (session.request.model) {
               const priceData = await session.getCachedPriceDataByBillingSource();
@@ -711,7 +763,10 @@ export class ProxyResponseHandler {
               error: error instanceof Error ? error.message : String(error),
             });
           }
+        }
 
+        // 更新 session 使用量到 Redis(用于实时监控)
+        if (session.sessionId && usageMetrics) {
           void SessionManager.updateSessionUsage(session.sessionId, {
             inputTokens: usageMetrics.input_tokens,
             outputTokens: usageMetrics.output_tokens,
@@ -782,6 +837,16 @@ export class ProxyResponseHandler {
           providerName: provider.name,
           statusCode,
         });
+
+        emitLangfuseTrace(session, {
+          responseHeaders: response.headers,
+          responseText,
+          usageMetrics,
+          costUsd: costUsdStr,
+          statusCode,
+          durationMs: Date.now() - session.startTime,
+          isStreaming: false,
+        });
       } catch (error) {
         // 检测 AbortError 的来源:响应超时 vs 客户端中断
         const err = error as Error;
@@ -1220,6 +1285,18 @@ export class ProxyResponseHandler {
               finalized.errorMessage ?? undefined,
               finalized.providerIdForPersistence ?? undefined
             );
+
+            emitLangfuseTrace(session, {
+              responseHeaders: response.headers,
+              responseText: allContent,
+              usageMetrics: parseUsageFromResponseText(allContent, provider.providerType)
+                .usageMetrics,
+              costUsd: undefined,
+              statusCode: finalized.effectiveStatusCode,
+              durationMs: duration,
+              isStreaming: true,
+              errorMessage: finalized.errorMessage ?? undefined,
+            });
           } catch (error) {
             const err = error instanceof Error ? error : new Error(String(error));
             const clientAborted = session.clientAbortSignal?.aborted ?? false;
@@ -1588,11 +1665,11 @@ export class ProxyResponseHandler {
         // 追踪消费到 Redis(用于限流)
         await trackCostToRedis(session, usageForCost);
 
-        // 更新 session 使用量到 Redis(用于实时监控)
-        if (session.sessionId) {
-          let costUsdStr: string | undefined;
+        // Calculate cost (for session tracking and Langfuse tracing)
+        let costUsdStr: string | undefined;
+        if (usageForCost) {
           try {
-            if (usageForCost && session.request.model) {
+            if (session.request.model) {
               const priceData = await session.getCachedPriceDataByBillingSource();
               if (priceData) {
                 const cost = calculateRequestCost(
@@ -1611,7 +1688,10 @@ export class ProxyResponseHandler {
               error: error instanceof Error ? error.message : String(error),
             });
           }
+        }
 
+        // 更新 session 使用量到 Redis(用于实时监控)
+        if (session.sessionId) {
           const payload: SessionUsageUpdate = {
             status: effectiveStatusCode >= 200 && effectiveStatusCode < 300 ? "completed" : "error",
             statusCode: effectiveStatusCode,
@@ -1650,6 +1730,18 @@ export class ProxyResponseHandler {
           providerId: providerIdForPersistence ?? session.provider?.id, // 更新最终供应商ID(重试切换后)
           context1mApplied: session.getContext1mApplied(),
         });
+
+        emitLangfuseTrace(session, {
+          responseHeaders: response.headers,
+          responseText: allContent,
+          usageMetrics: usageForCost,
+          costUsd: costUsdStr,
+          statusCode: effectiveStatusCode,
+          durationMs: duration,
+          isStreaming: true,
+          sseEventCount: chunks.length,
+          errorMessage: streamErrorMessage ?? undefined,
+        });
       };
 
       try {
@@ -2919,6 +3011,18 @@ async function persistRequestFailure(options: {
       });
     }
   }
+
+  // Emit Langfuse trace for error/abort paths
+  emitLangfuseTrace(session, {
+    responseHeaders: new Headers(),
+    responseText: "",
+    usageMetrics: null,
+    costUsd: undefined,
+    statusCode,
+    durationMs: duration,
+    isStreaming: phase === "stream",
+    errorMessage,
+  });
 }
 
 /**

+ 77 - 41
src/lib/langfuse/trace-proxy-request.ts

@@ -1,3 +1,4 @@
+import type { UsageMetrics } from "@/app/v1/_lib/proxy/response-handler";
 import type { ProxySession } from "@/app/v1/_lib/proxy/session";
 import { isLangfuseEnabled } from "@/lib/langfuse/index";
 import { logger } from "@/lib/logger";
@@ -40,21 +41,39 @@ function getStatusCategory(statusCode: number): string {
   return `${Math.floor(statusCode / 100)}xx`;
 }
 
+const LANGFUSE_MAX_IO_SIZE = Number(process.env.LANGFUSE_MAX_IO_SIZE) || 100_000;
+
+/**
+ * Truncate data for Langfuse to avoid excessive payload sizes.
+ */
+function truncateForLangfuse(data: unknown, maxChars: number = LANGFUSE_MAX_IO_SIZE): unknown {
+  if (typeof data === "string") {
+    return data.length > maxChars ? `${data.substring(0, maxChars)}...[truncated]` : data;
+  }
+  if (data != null && typeof data === "object") {
+    const str = JSON.stringify(data);
+    if (str.length > maxChars) {
+      return {
+        _truncated: true,
+        _length: str.length,
+        _preview: str.substring(0, Math.min(maxChars, 2000)),
+      };
+    }
+    return data;
+  }
+  return data;
+}
+
 export interface TraceContext {
   session: ProxySession;
-  response: Response;
+  responseHeaders: Headers;
   durationMs: number;
   statusCode: number;
   responseText?: string;
   isStreaming: boolean;
   sseEventCount?: number;
   errorMessage?: string;
-  usageMetrics?: {
-    input_tokens?: number;
-    output_tokens?: number;
-    cache_creation_input_tokens?: number;
-    cache_read_input_tokens?: number;
-  } | null;
+  usageMetrics?: UsageMetrics | null;
   costUsd?: string;
 }
 
@@ -70,14 +89,16 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
   try {
     const { startObservation, propagateAttributes } = await import("@langfuse/tracing");
 
-    const { session, response, durationMs, statusCode, isStreaming } = ctx;
+    const { session, durationMs, statusCode, isStreaming } = ctx;
     const provider = session.provider;
     const messageContext = session.messageContext;
 
-    // Build tags
+    // Build tags - include provider name and model
     const tags: string[] = [];
     if (provider?.providerType) tags.push(provider.providerType);
+    if (provider?.name) tags.push(provider.name);
     if (session.originalFormat) tags.push(session.originalFormat);
+    if (session.getCurrentModel()) tags.push(session.getCurrentModel()!);
     tags.push(getStatusCategory(statusCode));
 
     // Build trace-level metadata (propagateAttributes requires all values to be strings)
@@ -90,37 +111,45 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
       requestSequence: String(session.getRequestSequence()),
     };
 
-    // Build generation metadata
+    // Build generation metadata - all request detail fields
     const generationMetadata: Record<string, unknown> = {
+      // Provider
       providerId: provider?.id,
       providerName: provider?.name,
       providerType: provider?.providerType,
       providerChain: session.getProviderChain(),
-      specialSettings: session.getSpecialSettings(),
+      // Model
+      model: session.getCurrentModel(),
+      originalModel: session.getOriginalModel(),
       modelRedirected: session.isModelRedirected(),
-      originalModel: session.isModelRedirected() ? session.getOriginalModel() : undefined,
-      isStreaming,
-      statusCode,
+      // Special settings
+      specialSettings: session.getSpecialSettings(),
+      // Request context
+      endpoint: session.getEndpoint(),
+      method: session.method,
+      clientFormat: session.originalFormat,
+      userAgent: session.userAgent,
+      requestSequence: session.getRequestSequence(),
+      sessionId: session.sessionId,
+      keyName: messageContext?.key?.name,
+      // Timing
       durationMs,
       ttfbMs: session.ttfbMs,
+      // Flags
+      isStreaming,
       cacheTtlApplied: session.getCacheTtlResolved(),
       context1mApplied: session.getContext1mApplied(),
+      // Error
       errorMessage: ctx.errorMessage,
+      // Request summary (quick overview)
+      requestSummary: buildRequestBodySummary(session),
+      // SSE
+      sseEventCount: ctx.sseEventCount,
+      // Headers (sanitized)
       requestHeaders: sanitizeHeaders(session.headers),
-      responseHeaders: sanitizeHeaders(response.headers),
-      requestBodySummary: buildRequestBodySummary(session),
+      responseHeaders: sanitizeHeaders(ctx.responseHeaders),
     };
 
-    // Add response body summary
-    if (isStreaming) {
-      generationMetadata.sseEventCount = ctx.sseEventCount;
-    } else if (ctx.responseText) {
-      generationMetadata.responseBodySummary =
-        ctx.responseText.length > 2000
-          ? `${ctx.responseText.substring(0, 2000)}...[truncated]`
-          : ctx.responseText;
-    }
-
     // Build usage details for Langfuse generation
     const usageDetails: Record<string, number> | undefined = ctx.usageMetrics
       ? {
@@ -147,12 +176,19 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
 
     // Create the root trace span
     const rootSpan = startObservation("proxy-request", {
-      input: traceMetadata,
+      input: {
+        endpoint: session.getEndpoint(),
+        method: session.method,
+        model: session.getCurrentModel(),
+        clientFormat: session.originalFormat,
+        providerName: provider?.name,
+      },
       output: {
         statusCode,
         durationMs,
         model: session.getCurrentModel(),
         hasUsage: !!ctx.usageMetrics,
+        costUsd: ctx.costUsd,
       },
     });
 
@@ -166,17 +202,23 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
         traceName: `${session.method} ${session.getEndpoint() ?? "/"}`,
       },
       async () => {
+        // Generation input = actual request payload
+        const generationInput = truncateForLangfuse(session.request.message);
+
+        // Generation output = actual response body
+        const generationOutput = ctx.responseText
+          ? truncateForLangfuse(tryParseJsonSafe(ctx.responseText))
+          : isStreaming
+            ? { streaming: true, sseEventCount: ctx.sseEventCount }
+            : { statusCode };
+
         // Create the LLM generation observation
         const generation = rootSpan.startObservation(
           "llm-call",
           {
             model: session.getCurrentModel() ?? undefined,
-            input: buildRequestBodySummary(session),
-            output: isStreaming
-              ? { streaming: true, sseEventCount: ctx.sseEventCount }
-              : ctx.responseText
-                ? tryParseJsonSafe(ctx.responseText)
-                : { statusCode },
+            input: generationInput,
+            output: generationOutput,
             ...(usageDetails && Object.keys(usageDetails).length > 0 ? { usageDetails } : {}),
             ...(costDetails ? { costDetails } : {}),
             metadata: generationMetadata,
@@ -205,14 +247,8 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
 
 function tryParseJsonSafe(text: string): unknown {
   try {
-    const parsed = JSON.parse(text);
-    // Truncate large outputs to avoid excessive data
-    const str = JSON.stringify(parsed);
-    if (str.length > 4000) {
-      return { _truncated: true, _length: str.length, _preview: str.substring(0, 2000) };
-    }
-    return parsed;
+    return JSON.parse(text);
   } catch {
-    return text.length > 2000 ? `${text.substring(0, 2000)}...[truncated]` : text;
+    return text;
   }
 }

+ 161 - 11
tests/unit/langfuse/langfuse-trace.test.ts

@@ -114,7 +114,7 @@ describe("traceProxyRequest", () => {
 
     await traceProxyRequest({
       session: createMockSession(),
-      response: new Response("ok", { status: 200 }),
+      responseHeaders: new Headers(),
       durationMs: 500,
       statusCode: 200,
       isStreaming: false,
@@ -128,10 +128,11 @@ describe("traceProxyRequest", () => {
 
     await traceProxyRequest({
       session: createMockSession(),
-      response: new Response("ok", { status: 200 }),
+      responseHeaders: new Headers({ "content-type": "application/json" }),
       durationMs: 500,
       statusCode: 200,
       isStreaming: false,
+      responseText: '{"content": "Hi there"}',
     });
 
     expect(mockStartObservation).toHaveBeenCalledWith(
@@ -141,6 +142,12 @@ describe("traceProxyRequest", () => {
           endpoint: "/v1/messages",
           method: "POST",
           clientFormat: "claude",
+          providerName: "anthropic-main",
+        }),
+        output: expect.objectContaining({
+          statusCode: 200,
+          durationMs: 500,
+          costUsd: undefined,
         }),
       })
     );
@@ -157,12 +164,47 @@ describe("traceProxyRequest", () => {
     expect(mockGenerationEnd).toHaveBeenCalled();
   });
 
+  test("should use actual request messages as generation input", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+    const session = createMockSession();
+
+    await traceProxyRequest({
+      session,
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      responseText: '{"content": "response"}',
+    });
+
+    const generationCall = mockRootSpan.startObservation.mock.calls[0];
+    // Generation input should be the actual request message, not a summary
+    expect(generationCall[1].input).toEqual(session.request.message);
+  });
+
+  test("should use actual response body as generation output", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+    const responseBody = { content: [{ type: "text", text: "Hello!" }] };
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      responseText: JSON.stringify(responseBody),
+    });
+
+    const generationCall = mockRootSpan.startObservation.mock.calls[0];
+    expect(generationCall[1].output).toEqual(responseBody);
+  });
+
   test("should redact sensitive headers", async () => {
     const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
 
     await traceProxyRequest({
       session: createMockSession(),
-      response: new Response("ok", { status: 200 }),
+      responseHeaders: new Headers({ "x-api-key": "secret-mock" }),
       durationMs: 500,
       statusCode: 200,
       isStreaming: false,
@@ -172,14 +214,15 @@ describe("traceProxyRequest", () => {
     const metadata = generationCall[1].metadata;
     expect(metadata.requestHeaders["x-api-key"]).toBe("[REDACTED]");
     expect(metadata.requestHeaders["content-type"]).toBe("application/json");
+    expect(metadata.responseHeaders["x-api-key"]).toBe("[REDACTED]");
   });
 
-  test("should propagate userId and sessionId", async () => {
+  test("should include provider name and model in tags", async () => {
     const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
 
     await traceProxyRequest({
       session: createMockSession(),
-      response: new Response("ok", { status: 200 }),
+      responseHeaders: new Headers(),
       durationMs: 500,
       statusCode: 200,
       isStreaming: false,
@@ -189,7 +232,12 @@ describe("traceProxyRequest", () => {
       expect.objectContaining({
         userId: "7",
         sessionId: "sess_abc12345_def67890",
-        tags: expect.arrayContaining(["claude", "2xx"]),
+        tags: expect.arrayContaining([
+          "claude",
+          "anthropic-main",
+          "claude-sonnet-4-20250514",
+          "2xx",
+        ]),
       })
     );
   });
@@ -199,7 +247,7 @@ describe("traceProxyRequest", () => {
 
     await traceProxyRequest({
       session: createMockSession(),
-      response: new Response("ok", { status: 200 }),
+      responseHeaders: new Headers(),
       durationMs: 500,
       statusCode: 200,
       isStreaming: false,
@@ -222,6 +270,45 @@ describe("traceProxyRequest", () => {
     });
   });
 
+  test("should include providerChain, specialSettings, and model in metadata", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const providerChain = [
+      {
+        id: 1,
+        name: "anthropic-main",
+        providerType: "claude",
+        reason: "initial_selection",
+        timestamp: Date.now(),
+      },
+    ];
+
+    await traceProxyRequest({
+      session: createMockSession({
+        getSpecialSettings: () => ({ maxThinking: 8192 }),
+        getProviderChain: () => providerChain,
+      }),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const generationCall = mockRootSpan.startObservation.mock.calls[0];
+    const metadata = generationCall[1].metadata;
+    expect(metadata.providerChain).toEqual(providerChain);
+    expect(metadata.specialSettings).toEqual({ maxThinking: 8192 });
+    expect(metadata.model).toBe("claude-sonnet-4-20250514");
+    expect(metadata.originalModel).toBe("claude-sonnet-4-20250514");
+    expect(metadata.providerName).toBe("anthropic-main");
+    expect(metadata.requestSummary).toEqual(
+      expect.objectContaining({
+        model: "claude-sonnet-4-20250514",
+        messageCount: 1,
+      })
+    );
+  });
+
   test("should handle model redirect metadata", async () => {
     const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
 
@@ -235,7 +322,7 @@ describe("traceProxyRequest", () => {
           model: "glm-4",
         },
       }),
-      response: new Response("ok", { status: 200 }),
+      responseHeaders: new Headers(),
       durationMs: 500,
       statusCode: 200,
       isStreaming: false,
@@ -252,7 +339,7 @@ describe("traceProxyRequest", () => {
     const startTime = Date.now() - 500;
     await traceProxyRequest({
       session: createMockSession({ startTime, ttfbMs: 200 }),
-      response: new Response("ok", { status: 200 }),
+      responseHeaders: new Headers(),
       durationMs: 500,
       statusCode: 200,
       isStreaming: false,
@@ -274,7 +361,7 @@ describe("traceProxyRequest", () => {
     await expect(
       traceProxyRequest({
         session: createMockSession(),
-        response: new Response("ok", { status: 200 }),
+        responseHeaders: new Headers(),
         durationMs: 500,
         statusCode: 200,
         isStreaming: false,
@@ -287,7 +374,7 @@ describe("traceProxyRequest", () => {
 
     await traceProxyRequest({
       session: createMockSession(),
-      response: new Response("error", { status: 502 }),
+      responseHeaders: new Headers(),
       durationMs: 500,
       statusCode: 502,
       isStreaming: false,
@@ -300,6 +387,69 @@ describe("traceProxyRequest", () => {
       })
     );
   });
+
+  test("should truncate large input/output for Langfuse", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    // Generate a large response text (> default 100KB limit)
+    const largeContent = "x".repeat(200_000);
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      responseText: largeContent,
+    });
+
+    const generationCall = mockRootSpan.startObservation.mock.calls[0];
+    const output = generationCall[1].output as string;
+    // Non-JSON text should be truncated
+    expect(output.length).toBeLessThan(200_000);
+    expect(output).toContain("...[truncated]");
+  });
+
+  test("should show streaming output with sseEventCount when no responseText", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: true,
+      sseEventCount: 42,
+    });
+
+    const generationCall = mockRootSpan.startObservation.mock.calls[0];
+    expect(generationCall[1].output).toEqual({
+      streaming: true,
+      sseEventCount: 42,
+    });
+  });
+
+  test("should include costUsd in root span output", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      costUsd: "0.05",
+    });
+
+    expect(mockStartObservation).toHaveBeenCalledWith(
+      "proxy-request",
+      expect.objectContaining({
+        output: expect.objectContaining({
+          costUsd: "0.05",
+        }),
+      })
+    );
+  });
 });
 
 describe("isLangfuseEnabled", () => {