Browse Source

fix(langfuse): correct observation duration, cost key, and usage type names

- Pass actual startTime/endTime to startObservation and end() so root
  span and generation reflect real request duration instead of ~0ms
- Change costDetails key from 'totalUsd' to 'total' per Langfuse SDK
  convention (values are already in USD)
- Change usageDetails keys from camelCase (cacheRead, cacheCreation) to
  snake_case (cache_read_input_tokens, cache_creation_input_tokens) so
  Langfuse UI can categorize them as input usage types
ding113 3 weeks ago
parent
commit
5e932608e5
2 changed files with 79 additions and 26 deletions
  1. 32 21
      src/lib/langfuse/trace-proxy-request.ts
  2. 47 5
      tests/unit/langfuse/langfuse-trace.test.ts

+ 32 - 21
src/lib/langfuse/trace-proxy-request.ts

@@ -93,6 +93,10 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
     const provider = session.provider;
     const messageContext = session.messageContext;
 
+    // Compute actual request timing from session data
+    const requestStartTime = new Date(session.startTime);
+    const requestEndTime = new Date(session.startTime + durationMs);
+
     // Build tags - include provider name and model
     const tags: string[] = [];
     if (provider?.providerType) tags.push(provider.providerType);
@@ -160,10 +164,10 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
             ? { output: ctx.usageMetrics.output_tokens }
             : {}),
           ...(ctx.usageMetrics.cache_read_input_tokens != null
-            ? { cacheRead: ctx.usageMetrics.cache_read_input_tokens }
+            ? { cache_read_input_tokens: ctx.usageMetrics.cache_read_input_tokens }
             : {}),
           ...(ctx.usageMetrics.cache_creation_input_tokens != null
-            ? { cacheCreation: ctx.usageMetrics.cache_creation_input_tokens }
+            ? { cache_creation_input_tokens: ctx.usageMetrics.cache_creation_input_tokens }
             : {}),
         }
       : undefined;
@@ -171,26 +175,32 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
     // Build cost details
     const costDetails: Record<string, number> | undefined =
       ctx.costUsd && Number.parseFloat(ctx.costUsd) > 0
-        ? { totalUsd: Number.parseFloat(ctx.costUsd) }
+        ? { total: Number.parseFloat(ctx.costUsd) }
         : undefined;
 
     // Create the root trace span
-    const rootSpan = startObservation("proxy-request", {
-      input: {
-        endpoint: session.getEndpoint(),
-        method: session.method,
-        model: session.getCurrentModel(),
-        clientFormat: session.originalFormat,
-        providerName: provider?.name,
-      },
-      output: {
-        statusCode,
-        durationMs,
-        model: session.getCurrentModel(),
-        hasUsage: !!ctx.usageMetrics,
-        costUsd: ctx.costUsd,
+    const rootSpan = startObservation(
+      "proxy-request",
+      {
+        input: {
+          endpoint: session.getEndpoint(),
+          method: session.method,
+          model: session.getCurrentModel(),
+          clientFormat: session.originalFormat,
+          providerName: provider?.name,
+        },
+        output: {
+          statusCode,
+          durationMs,
+          model: session.getCurrentModel(),
+          hasUsage: !!ctx.usageMetrics,
+          costUsd: ctx.costUsd,
+        },
       },
-    });
+      {
+        startTime: requestStartTime,
+      }
+    );
 
     // Propagate trace attributes
     await propagateAttributes(
@@ -223,7 +233,8 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
             ...(costDetails ? { costDetails } : {}),
             metadata: generationMetadata,
           },
-          { asType: "generation" }
+          // SDK runtime supports startTime on child observations but types don't expose it
+          { asType: "generation", startTime: requestStartTime } as { asType: "generation" }
         );
 
         // Set TTFB as completionStartTime
@@ -233,7 +244,7 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
           });
         }
 
-        generation.end();
+        generation.end(requestEndTime);
       }
     );
 
@@ -255,7 +266,7 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
       },
     });
 
-    rootSpan.end();
+    rootSpan.end(requestEndTime);
   } catch (error) {
     logger.warn("[Langfuse] Failed to trace proxy request", {
       error: error instanceof Error ? error.message : String(error),

+ 47 - 5
tests/unit/langfuse/langfuse-trace.test.ts

@@ -152,6 +152,9 @@ describe("traceProxyRequest", () => {
           durationMs: 500,
           costUsd: undefined,
         }),
+      }),
+      expect.objectContaining({
+        startTime: expect.any(Date),
       })
     );
 
@@ -160,11 +163,14 @@ describe("traceProxyRequest", () => {
       expect.objectContaining({
         model: "claude-sonnet-4-20250514",
       }),
-      { asType: "generation" }
+      expect.objectContaining({
+        asType: "generation",
+        startTime: expect.any(Date),
+      })
     );
 
-    expect(mockSpanEnd).toHaveBeenCalled();
-    expect(mockGenerationEnd).toHaveBeenCalled();
+    expect(mockSpanEnd).toHaveBeenCalledWith(expect.any(Date));
+    expect(mockGenerationEnd).toHaveBeenCalledWith(expect.any(Date));
   });
 
   test("should use actual request messages as generation input", async () => {
@@ -266,10 +272,10 @@ describe("traceProxyRequest", () => {
     expect(generationCall[1].usageDetails).toEqual({
       input: 100,
       output: 50,
-      cacheRead: 20,
+      cache_read_input_tokens: 20,
     });
     expect(generationCall[1].costDetails).toEqual({
-      totalUsd: 0.0015,
+      total: 0.0015,
     });
   });
 
@@ -353,6 +359,39 @@ describe("traceProxyRequest", () => {
     });
   });
 
+  test("should pass correct startTime and endTime to observations", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const startTime = 1700000000000;
+    const durationMs = 5000;
+
+    await traceProxyRequest({
+      session: createMockSession({ startTime }),
+      responseHeaders: new Headers(),
+      durationMs,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const expectedStart = new Date(startTime);
+    const expectedEnd = new Date(startTime + durationMs);
+
+    // Root span gets startTime in options (3rd arg)
+    expect(mockStartObservation).toHaveBeenCalledWith("proxy-request", expect.any(Object), {
+      startTime: expectedStart,
+    });
+
+    // Generation gets startTime in options (3rd arg)
+    expect(mockRootSpan.startObservation).toHaveBeenCalledWith("llm-call", expect.any(Object), {
+      asType: "generation",
+      startTime: expectedStart,
+    });
+
+    // Both end() calls receive the computed endTime
+    expect(mockGenerationEnd).toHaveBeenCalledWith(expectedEnd);
+    expect(mockSpanEnd).toHaveBeenCalledWith(expectedEnd);
+  });
+
   test("should handle errors gracefully without throwing", async () => {
     const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
 
@@ -450,6 +489,9 @@ describe("traceProxyRequest", () => {
         output: expect.objectContaining({
           costUsd: "0.05",
         }),
+      }),
+      expect.objectContaining({
+        startTime: expect.any(Date),
       })
     );
   });