Browse Source

refactor(langfuse): remove truncation and header redaction for raw observability

- Remove truncateForLangfuse, sanitizeHeaders, REDACTED_HEADERS, and
  LANGFUSE_MAX_IO_SIZE env config -- observability platforms should
  store raw data without lossy transformations
- Replace sanitizeHeaders with headersToRecord (pass-through)
- Unify generation input to use actualRequestBody (forwardedRequestBody
  when available) instead of session.request.message, ensuring the
  complete forwarded request body is recorded everywhere
ding113 1 week ago
parent
commit
a04356ec33

+ 0 - 1
src/lib/config/env.schema.ts

@@ -134,7 +134,6 @@ export const EnvSchema = z.object({
   LANGFUSE_BASE_URL: z.string().default("https://cloud.langfuse.com"),
   LANGFUSE_SAMPLE_RATE: z.coerce.number().min(0).max(1).default(1.0),
   LANGFUSE_DEBUG: z.string().default("false").transform(booleanTransform),
-  LANGFUSE_MAX_IO_SIZE: z.coerce.number().int().min(1).max(10_000_000).default(100_000),
 });
 
 /**

+ 18 - 56
src/lib/langfuse/trace-proxy-request.ts

@@ -1,28 +1,9 @@
 import type { UsageMetrics } from "@/app/v1/_lib/proxy/response-handler";
 import type { ProxySession } from "@/app/v1/_lib/proxy/session";
-import { getEnvConfig } from "@/lib/config/env.schema";
 import { isLangfuseEnabled } from "@/lib/langfuse/index";
 import { logger } from "@/lib/logger";
 import type { CostBreakdown } from "@/lib/utils/cost-calculation";
 
-// Auth-sensitive header names to redact
-const REDACTED_HEADERS = new Set([
-  "x-api-key",
-  "authorization",
-  "x-goog-api-key",
-  "anthropic-api-key",
-  "cookie",
-  "set-cookie",
-]);
-
-function sanitizeHeaders(headers: Headers): Record<string, string> {
-  const result: Record<string, string> = {};
-  headers.forEach((value, key) => {
-    result[key] = REDACTED_HEADERS.has(key.toLowerCase()) ? "[REDACTED]" : value;
-  });
-  return result;
-}
-
 function buildRequestBodySummary(session: ProxySession): Record<string, unknown> {
   const msg = session.request.message as Record<string, unknown>;
   return {
@@ -43,8 +24,12 @@ function getStatusCategory(statusCode: number): string {
   return `${Math.floor(statusCode / 100)}xx`;
 }
 
-function getLangfuseMaxIoSize(): number {
-  return getEnvConfig().LANGFUSE_MAX_IO_SIZE;
+function headersToRecord(headers: Headers): Record<string, string> {
+  const result: Record<string, string> = {};
+  headers.forEach((value, key) => {
+    result[key] = value;
+  });
+  return result;
 }
 
 const SUCCESS_REASONS = new Set([
@@ -68,27 +53,6 @@ function isErrorReason(reason: string | undefined): boolean {
   return !!reason && ERROR_REASONS.has(reason);
 }
 
-/**
- * Truncate data for Langfuse to avoid excessive payload sizes.
- */
-function truncateForLangfuse(data: unknown, maxChars: number = getLangfuseMaxIoSize()): unknown {
-  if (typeof data === "string") {
-    return data.length > maxChars ? `${data.substring(0, maxChars)}...[truncated]` : data;
-  }
-  if (data != null && typeof data === "object") {
-    const str = JSON.stringify(data);
-    if (str.length > maxChars) {
-      return {
-        _truncated: true,
-        _length: str.length,
-        _preview: str.substring(0, Math.min(maxChars, 2000)),
-      };
-    }
-    return data;
-  }
-  return data;
-}
-
 type ObservationLevel = "DEBUG" | "DEFAULT" | "WARNING" | "ERROR";
 
 export interface TraceContext {
@@ -155,14 +119,14 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
       if (failedAttempts >= 1) rootSpanLevel = "WARNING";
     }
 
-    // Actual request body (forwarded to upstream after all preprocessing)
+    // Actual request body (forwarded to upstream after all preprocessing) - no truncation
     const actualRequestBody = session.forwardedRequestBody
-      ? truncateForLangfuse(tryParseJsonSafe(session.forwardedRequestBody))
-      : truncateForLangfuse(session.request.message);
+      ? tryParseJsonSafe(session.forwardedRequestBody)
+      : session.request.message;
 
-    // Actual response body
+    // Actual response body - no truncation
     const actualResponseBody = ctx.responseText
-      ? truncateForLangfuse(tryParseJsonSafe(ctx.responseText))
+      ? tryParseJsonSafe(ctx.responseText)
       : isStreaming
         ? { streaming: true, sseEventCount: ctx.sseEventCount }
         : { statusCode };
@@ -199,7 +163,7 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
       requestSequence: String(session.getRequestSequence()),
     };
 
-    // Build generation metadata - all request detail fields
+    // Build generation metadata - all request detail fields, raw headers (no redaction)
     const generationMetadata: Record<string, unknown> = {
       // Provider
       providerId: provider?.id,
@@ -234,9 +198,9 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
       requestSummary: buildRequestBodySummary(session),
       // SSE
       sseEventCount: ctx.sseEventCount,
-      // Headers (sanitized)
-      requestHeaders: sanitizeHeaders(session.headers),
-      responseHeaders: sanitizeHeaders(ctx.responseHeaders),
+      // Headers (raw, no redaction)
+      requestHeaders: headersToRecord(session.headers),
+      responseHeaders: headersToRecord(ctx.responseHeaders),
     };
 
     // Build usage details for Langfuse generation
@@ -331,12 +295,10 @@ export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
         // 3. LLM generation (startTime = forwardStartTime when available)
         const generationStartTime = forwardStartDate ?? requestStartTime;
 
-        // Generation input = actual request payload
-        const generationInput = truncateForLangfuse(session.request.message);
-
-        // Generation output = actual response body
+        // Generation input/output = raw payload, no truncation
+        const generationInput = actualRequestBody;
         const generationOutput = ctx.responseText
-          ? truncateForLangfuse(tryParseJsonSafe(ctx.responseText))
+          ? tryParseJsonSafe(ctx.responseText)
           : isStreaming
             ? { streaming: true, sseEventCount: ctx.sseEventCount }
             : { statusCode };

+ 8 - 8
tests/unit/langfuse/langfuse-trace.test.ts

@@ -233,7 +233,7 @@ describe("traceProxyRequest", () => {
     expect(llmCall[1].output).toEqual(responseBody);
   });
 
-  test("should redact sensitive headers", async () => {
+  test("should pass raw headers without redaction", async () => {
     const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
 
     await traceProxyRequest({
@@ -248,9 +248,9 @@ describe("traceProxyRequest", () => {
       (c: unknown[]) => c[0] === "llm-call"
     );
     const metadata = llmCall[1].metadata;
-    expect(metadata.requestHeaders["x-api-key"]).toBe("[REDACTED]");
+    expect(metadata.requestHeaders["x-api-key"]).toBe("test-mock-key-not-real");
     expect(metadata.requestHeaders["content-type"]).toBe("application/json");
-    expect(metadata.responseHeaders["x-api-key"]).toBe("[REDACTED]");
+    expect(metadata.responseHeaders["x-api-key"]).toBe("secret-mock");
   });
 
   test("should include provider name and model in tags", async () => {
@@ -467,10 +467,10 @@ describe("traceProxyRequest", () => {
     );
   });
 
-  test("should truncate large input/output for Langfuse", async () => {
+  test("should pass large input/output without truncation", async () => {
     const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
 
-    // Generate a large response text (> default 100KB limit)
+    // Generate a large response text
     const largeContent = "x".repeat(200_000);
 
     await traceProxyRequest({
@@ -486,9 +486,9 @@ describe("traceProxyRequest", () => {
       (c: unknown[]) => c[0] === "llm-call"
     );
     const output = llmCall[1].output as string;
-    // Non-JSON text should be truncated
-    expect(output.length).toBeLessThan(200_000);
-    expect(output).toContain("...[truncated]");
+    // Should be the full content, no truncation
+    expect(output).toBe(largeContent);
+    expect(output).not.toContain("...[truncated]");
   });
 
   test("should show streaming output with sseEventCount when no responseText", async () => {