Просмотр исходного кода

feat(observability): integrate Langfuse for LLM request tracing

Ding 5 дней назад
Родитель
Сommit
a6c20e5af8

+ 10 - 0
.env.example

@@ -128,6 +128,16 @@ FETCH_HEADERS_TIMEOUT=600000
 FETCH_BODY_TIMEOUT=600000
 MAX_RETRY_ATTEMPTS_DEFAULT=2                # 单供应商最大尝试次数(含首次调用),范围 1-10,留空使用默认值 2
 
+# Langfuse Observability (optional, auto-enabled when keys are set)
+# 功能说明:企业级 LLM 可观测性集成,自动追踪所有代理请求的完整生命周期
+# - 配置 PUBLIC_KEY 和 SECRET_KEY 后自动启用
+# - 支持 Langfuse Cloud 和自托管实例
+LANGFUSE_PUBLIC_KEY=                        # Langfuse project public key (pk-lf-...)
+LANGFUSE_SECRET_KEY=                        # Langfuse project secret key (sk-lf-...)
+LANGFUSE_BASE_URL=https://cloud.langfuse.com  # Langfuse server URL (self-hosted or cloud)
+LANGFUSE_SAMPLE_RATE=1.0                    # Trace sampling rate (0.0-1.0, default: 1.0 = 100%)
+LANGFUSE_DEBUG=false                        # Enable Langfuse debug logging
+
 # 智能探测配置
 # 功能说明:当熔断器处于 OPEN 状态时,定期探测供应商以实现更快恢复
 # - ENABLE_SMART_PROBING:是否启用智能探测(默认:false)

+ 4 - 0
package.json

@@ -44,7 +44,11 @@
     "@hono/zod-openapi": "^1",
     "@hookform/resolvers": "^5",
     "@iarna/toml": "^2.2.5",
+    "@langfuse/client": "^4.6.1",
+    "@langfuse/otel": "^4.6.1",
+    "@langfuse/tracing": "^4.6.1",
     "@lobehub/icons": "^2",
+    "@opentelemetry/sdk-node": "^0.212.0",
     "@radix-ui/react-alert-dialog": "^1",
     "@radix-ui/react-avatar": "^1",
     "@radix-ui/react-checkbox": "^1",

+ 4 - 1
src/app/v1/_lib/proxy-handler.ts

@@ -78,9 +78,12 @@ export async function handleProxyRequest(c: Context): Promise<Response> {
       });
     }
 
+    session.recordForwardStart();
     const response = await ProxyForwarder.send(session);
     const handled = await ProxyResponseHandler.dispatch(session, response);
-    return await attachSessionIdToErrorResponse(session.sessionId, handled);
+    const finalResponse = await attachSessionIdToErrorResponse(session.sessionId, handled);
+
+    return finalResponse;
   } catch (error) {
     logger.error("Proxy handler error:", error);
     if (session) {

+ 2 - 0
src/app/v1/_lib/proxy/forwarder.ts

@@ -1648,6 +1648,7 @@ export class ProxyForwarder {
 
         const bodyString = JSON.stringify(bodyToSerialize);
         requestBody = bodyString;
+        session.forwardedRequestBody = bodyString;
       }
 
       // 检测流式请求:Gemini 支持两种方式
@@ -1974,6 +1975,7 @@ export class ProxyForwarder {
 
         const bodyString = JSON.stringify(messageToSend);
         requestBody = bodyString;
+        session.forwardedRequestBody = bodyString;
 
         try {
           const parsed = JSON.parse(bodyString);

+ 170 - 9
src/app/v1/_lib/proxy/response-handler.ts

@@ -8,7 +8,8 @@ import { RateLimitService } from "@/lib/rate-limit";
 import type { LeaseWindowType } from "@/lib/rate-limit/lease";
 import { SessionManager } from "@/lib/session-manager";
 import { SessionTracker } from "@/lib/session-tracker";
-import { calculateRequestCost } from "@/lib/utils/cost-calculation";
+import type { CostBreakdown } from "@/lib/utils/cost-calculation";
+import { calculateRequestCost, calculateRequestCostBreakdown } from "@/lib/utils/cost-calculation";
 import { hasValidPriceData } from "@/lib/utils/price-data";
 import { isSSEText, parseSSEData } from "@/lib/utils/sse";
 import { detectUpstreamErrorFromSseOrJsonText } from "@/lib/utils/upstream-error-detection";
@@ -39,6 +40,49 @@ export type UsageMetrics = {
   output_image_tokens?: number;
 };
 
+/**
+ * Fire Langfuse trace asynchronously. Non-blocking, error-tolerant.
+ */
+function emitLangfuseTrace(
+  session: ProxySession,
+  data: {
+    responseHeaders: Headers;
+    responseText: string;
+    usageMetrics: UsageMetrics | null;
+    costUsd: string | undefined;
+    costBreakdown?: CostBreakdown;
+    statusCode: number;
+    durationMs: number;
+    isStreaming: boolean;
+    sseEventCount?: number;
+    errorMessage?: string;
+  }
+): void {
+  if (!process.env.LANGFUSE_PUBLIC_KEY || !process.env.LANGFUSE_SECRET_KEY) return;
+
+  void import("@/lib/langfuse/trace-proxy-request")
+    .then(({ traceProxyRequest }) => {
+      void traceProxyRequest({
+        session,
+        responseHeaders: data.responseHeaders,
+        durationMs: data.durationMs,
+        statusCode: data.statusCode,
+        isStreaming: data.isStreaming,
+        responseText: data.responseText,
+        usageMetrics: data.usageMetrics,
+        costUsd: data.costUsd,
+        costBreakdown: data.costBreakdown,
+        sseEventCount: data.sseEventCount,
+        errorMessage: data.errorMessage,
+      });
+    })
+    .catch((err) => {
+      logger.warn("[ResponseHandler] Langfuse trace failed", {
+        error: err instanceof Error ? err.message : String(err),
+      });
+    });
+}
+
 /**
  * 清理 Response headers 中的传输相关 header
  *
@@ -520,6 +564,18 @@ export class ProxyResponseHandler {
               duration,
               errorMessageForFinalize
             );
+
+            emitLangfuseTrace(session, {
+              responseHeaders: response.headers,
+              responseText,
+              usageMetrics: parseUsageFromResponseText(responseText, provider.providerType)
+                .usageMetrics,
+              costUsd: undefined,
+              statusCode,
+              durationMs: duration,
+              isStreaming: false,
+              errorMessage: errorMessageForFinalize,
+            });
           } catch (error) {
             if (!isClientAbortError(error as Error)) {
               logger.error(
@@ -687,10 +743,11 @@ export class ProxyResponseHandler {
           await trackCostToRedis(session, usageMetrics);
         }
 
-        // 更新 session 使用量到 Redis(用于实时监控)
-        if (session.sessionId && usageMetrics) {
-          // 计算成本(复用相同逻辑)
-          let costUsdStr: string | undefined;
+        // Calculate cost for session tracking (with multiplier) and Langfuse (raw)
+        let costUsdStr: string | undefined;
+        let rawCostUsdStr: string | undefined;
+        let costBreakdown: CostBreakdown | undefined;
+        if (usageMetrics) {
           try {
             if (session.request.model) {
               const priceData = await session.getCachedPriceDataByBillingSource();
@@ -704,6 +761,30 @@ export class ProxyResponseHandler {
                 if (cost.gt(0)) {
                   costUsdStr = cost.toString();
                 }
+                // Raw cost without multiplier for Langfuse
+                if (provider.costMultiplier !== 1) {
+                  const rawCost = calculateRequestCost(
+                    usageMetrics,
+                    priceData,
+                    1.0,
+                    session.getContext1mApplied()
+                  );
+                  if (rawCost.gt(0)) {
+                    rawCostUsdStr = rawCost.toString();
+                  }
+                } else {
+                  rawCostUsdStr = costUsdStr;
+                }
+                // Cost breakdown for Langfuse (raw, no multiplier)
+                try {
+                  costBreakdown = calculateRequestCostBreakdown(
+                    usageMetrics,
+                    priceData,
+                    session.getContext1mApplied()
+                  );
+                } catch {
+                  /* non-critical */
+                }
               }
             }
           } catch (error) {
@@ -711,7 +792,10 @@ export class ProxyResponseHandler {
               error: error instanceof Error ? error.message : String(error),
             });
           }
+        }
 
+        // 更新 session 使用量到 Redis(用于实时监控)
+        if (session.sessionId && usageMetrics) {
           void SessionManager.updateSessionUsage(session.sessionId, {
             inputTokens: usageMetrics.input_tokens,
             outputTokens: usageMetrics.output_tokens,
@@ -782,6 +866,17 @@ export class ProxyResponseHandler {
           providerName: provider.name,
           statusCode,
         });
+
+        emitLangfuseTrace(session, {
+          responseHeaders: response.headers,
+          responseText,
+          usageMetrics,
+          costUsd: rawCostUsdStr,
+          costBreakdown,
+          statusCode,
+          durationMs: Date.now() - session.startTime,
+          isStreaming: false,
+        });
       } catch (error) {
         // 检测 AbortError 的来源:响应超时 vs 客户端中断
         const err = error as Error;
@@ -1220,6 +1315,18 @@ export class ProxyResponseHandler {
               finalized.errorMessage ?? undefined,
               finalized.providerIdForPersistence ?? undefined
             );
+
+            emitLangfuseTrace(session, {
+              responseHeaders: response.headers,
+              responseText: allContent,
+              usageMetrics: parseUsageFromResponseText(allContent, provider.providerType)
+                .usageMetrics,
+              costUsd: undefined,
+              statusCode: finalized.effectiveStatusCode,
+              durationMs: duration,
+              isStreaming: true,
+              errorMessage: finalized.errorMessage ?? undefined,
+            });
           } catch (error) {
             const err = error instanceof Error ? error : new Error(String(error));
             const clientAborted = session.clientAbortSignal?.aborted ?? false;
@@ -1588,11 +1695,13 @@ export class ProxyResponseHandler {
         // 追踪消费到 Redis(用于限流)
         await trackCostToRedis(session, usageForCost);
 
-        // 更新 session 使用量到 Redis(用于实时监控)
-        if (session.sessionId) {
-          let costUsdStr: string | undefined;
+        // Calculate cost for session tracking (with multiplier) and Langfuse (raw)
+        let costUsdStr: string | undefined;
+        let rawCostUsdStr: string | undefined;
+        let costBreakdown: CostBreakdown | undefined;
+        if (usageForCost) {
           try {
-            if (usageForCost && session.request.model) {
+            if (session.request.model) {
               const priceData = await session.getCachedPriceDataByBillingSource();
               if (priceData) {
                 const cost = calculateRequestCost(
@@ -1604,6 +1713,30 @@ export class ProxyResponseHandler {
                 if (cost.gt(0)) {
                   costUsdStr = cost.toString();
                 }
+                // Raw cost without multiplier for Langfuse
+                if (provider.costMultiplier !== 1) {
+                  const rawCost = calculateRequestCost(
+                    usageForCost,
+                    priceData,
+                    1.0,
+                    session.getContext1mApplied()
+                  );
+                  if (rawCost.gt(0)) {
+                    rawCostUsdStr = rawCost.toString();
+                  }
+                } else {
+                  rawCostUsdStr = costUsdStr;
+                }
+                // Cost breakdown for Langfuse (raw, no multiplier)
+                try {
+                  costBreakdown = calculateRequestCostBreakdown(
+                    usageForCost,
+                    priceData,
+                    session.getContext1mApplied()
+                  );
+                } catch {
+                  /* non-critical */
+                }
               }
             }
           } catch (error) {
@@ -1611,7 +1744,10 @@ export class ProxyResponseHandler {
               error: error instanceof Error ? error.message : String(error),
             });
           }
+        }
 
+        // 更新 session 使用量到 Redis(用于实时监控)
+        if (session.sessionId) {
           const payload: SessionUsageUpdate = {
             status: effectiveStatusCode >= 200 && effectiveStatusCode < 300 ? "completed" : "error",
             statusCode: effectiveStatusCode,
@@ -1650,6 +1786,19 @@ export class ProxyResponseHandler {
           providerId: providerIdForPersistence ?? session.provider?.id, // 更新最终供应商ID(重试切换后)
           context1mApplied: session.getContext1mApplied(),
         });
+
+        emitLangfuseTrace(session, {
+          responseHeaders: response.headers,
+          responseText: allContent,
+          usageMetrics: usageForCost,
+          costUsd: rawCostUsdStr,
+          costBreakdown,
+          statusCode: effectiveStatusCode,
+          durationMs: duration,
+          isStreaming: true,
+          sseEventCount: chunks.length,
+          errorMessage: streamErrorMessage ?? undefined,
+        });
       };
 
       try {
@@ -2919,6 +3068,18 @@ async function persistRequestFailure(options: {
       });
     }
   }
+
+  // Emit Langfuse trace for error/abort paths
+  emitLangfuseTrace(session, {
+    responseHeaders: new Headers(),
+    responseText: "",
+    usageMetrics: null,
+    costUsd: undefined,
+    statusCode,
+    durationMs: duration,
+    isStreaming: phase === "stream",
+    errorMessage,
+  });
 }
 
 /**

+ 16 - 0
src/app/v1/_lib/proxy/session.ts

@@ -67,6 +67,12 @@ export class ProxySession {
   // Time To First Byte (ms). Streaming: first chunk. Non-stream: equals durationMs.
   ttfbMs: number | null = null;
 
+  // Timestamp when guard pipeline finished and forwarding started (epoch ms).
+  forwardStartTime: number | null = null;
+
+  // Actual serialized request body sent to upstream (after all preprocessing).
+  forwardedRequestBody: string | null = null;
+
   // Session ID(用于会话粘性和并发限流)
   sessionId: string | null;
 
@@ -313,6 +319,16 @@ export class ProxySession {
     return value;
   }
 
+  /**
+   * Record the timestamp when guard pipeline finished and upstream forwarding begins.
+   * Called once; subsequent calls are no-ops.
+   */
+  recordForwardStart(): void {
+    if (this.forwardStartTime === null) {
+      this.forwardStartTime = Date.now();
+    }
+  }
+
   /**
    * 设置 session ID
    */

+ 19 - 0
src/instrumentation.ts

@@ -140,6 +140,15 @@ function warmupApiKeyVacuumFilter(): void {
 export async function register() {
   // 仅在服务器端执行
   if (process.env.NEXT_RUNTIME === "nodejs") {
+    // Initialize Langfuse observability (no-op if env vars not set)
+    try {
+      const { initLangfuse } = await import("@/lib/langfuse");
+      await initLangfuse();
+    } catch (error) {
+      logger.warn("[Instrumentation] Langfuse initialization failed (non-critical)", {
+        error: error instanceof Error ? error.message : String(error),
+      });
+    }
     // Skip initialization in CI environment (no DB connection needed)
     if (process.env.CI === "true") {
       logger.warn(
@@ -216,6 +225,16 @@ export async function register() {
           });
         }
 
+        // Flush Langfuse pending spans
+        try {
+          const { shutdownLangfuse } = await import("@/lib/langfuse");
+          await shutdownLangfuse();
+        } catch (error) {
+          logger.warn("[Instrumentation] Failed to shutdown Langfuse", {
+            error: error instanceof Error ? error.message : String(error),
+          });
+        }
+
         // 尽力将 message_request 的异步批量更新刷入数据库(避免终止时丢失尾部日志)
         try {
           const { stopMessageRequestWriteBuffer } = await import(

+ 7 - 0
src/lib/config/env.schema.ts

@@ -127,6 +127,13 @@ export const EnvSchema = z.object({
   FETCH_BODY_TIMEOUT: z.coerce.number().default(600_000), // 请求/响应体传输超时(默认 600 秒)
   FETCH_HEADERS_TIMEOUT: z.coerce.number().default(600_000), // 响应头接收超时(默认 600 秒)
   FETCH_CONNECT_TIMEOUT: z.coerce.number().default(30000), // TCP 连接建立超时(默认 30 秒)
+
+  // Langfuse Observability (optional, auto-enabled when keys are set)
+  LANGFUSE_PUBLIC_KEY: z.string().optional(),
+  LANGFUSE_SECRET_KEY: z.string().optional(),
+  LANGFUSE_BASE_URL: z.string().default("https://cloud.langfuse.com"),
+  LANGFUSE_SAMPLE_RATE: z.coerce.number().min(0).max(1).default(1.0),
+  LANGFUSE_DEBUG: z.string().default("false").transform(booleanTransform),
 });
 
 /**

+ 92 - 0
src/lib/langfuse/index.ts

@@ -0,0 +1,92 @@
+import type { LangfuseSpanProcessor } from "@langfuse/otel";
+
+import type { NodeSDK } from "@opentelemetry/sdk-node";
+import { logger } from "@/lib/logger";
+
+let sdk: NodeSDK | null = null;
+let spanProcessor: LangfuseSpanProcessor | null = null;
+let initialized = false;
+
+export function isLangfuseEnabled(): boolean {
+  return !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY);
+}
+
+/**
+ * Initialize Langfuse OpenTelemetry SDK.
+ * Must be called early in the process (instrumentation.ts register()).
+ * No-op if LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY are not set.
+ */
+export async function initLangfuse(): Promise<void> {
+  if (initialized || !isLangfuseEnabled()) {
+    return;
+  }
+
+  try {
+    const { NodeSDK: OtelNodeSDK } = await import("@opentelemetry/sdk-node");
+    const { LangfuseSpanProcessor: LfSpanProcessor } = await import("@langfuse/otel");
+
+    const sampleRate = Number.parseFloat(process.env.LANGFUSE_SAMPLE_RATE || "1.0");
+
+    spanProcessor = new LfSpanProcessor({
+      publicKey: process.env.LANGFUSE_PUBLIC_KEY,
+      secretKey: process.env.LANGFUSE_SECRET_KEY,
+      baseUrl: process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com",
+      // Only export spans from langfuse-sdk scope (avoid noise from other OTel instrumentations)
+      shouldExportSpan: ({ otelSpan }) => otelSpan.instrumentationScope.name === "langfuse-sdk",
+    });
+
+    const samplerConfig =
+      sampleRate < 1.0
+        ? await (async () => {
+            const { TraceIdRatioBasedSampler } = await import("@opentelemetry/sdk-trace-base");
+            return { sampler: new TraceIdRatioBasedSampler(sampleRate) };
+          })()
+        : {};
+
+    sdk = new OtelNodeSDK({
+      spanProcessors: [spanProcessor],
+      ...samplerConfig,
+    });
+
+    sdk.start();
+    initialized = true;
+
+    logger.info("[Langfuse] Observability initialized", {
+      baseUrl: process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com",
+      sampleRate,
+      debug: process.env.LANGFUSE_DEBUG === "true",
+    });
+
+    if (process.env.LANGFUSE_DEBUG === "true") {
+      const { configureGlobalLogger, LogLevel } = await import("@langfuse/core");
+      configureGlobalLogger({ level: LogLevel.DEBUG });
+    }
+  } catch (error) {
+    logger.error("[Langfuse] Failed to initialize", {
+      error: error instanceof Error ? error.message : String(error),
+    });
+  }
+}
+
+/**
+ * Flush pending spans and shut down the SDK.
+ * Called during graceful shutdown (SIGTERM/SIGINT).
+ */
+export async function shutdownLangfuse(): Promise<void> {
+  if (!initialized || !spanProcessor) {
+    return;
+  }
+
+  try {
+    await spanProcessor.forceFlush();
+    if (sdk) {
+      await sdk.shutdown();
+    }
+    initialized = false;
+    logger.info("[Langfuse] Shutdown complete");
+  } catch (error) {
+    logger.warn("[Langfuse] Shutdown error", {
+      error: error instanceof Error ? error.message : String(error),
+    });
+  }
+}

+ 363 - 0
src/lib/langfuse/trace-proxy-request.ts

@@ -0,0 +1,363 @@
+import type { UsageMetrics } from "@/app/v1/_lib/proxy/response-handler";
+import type { ProxySession } from "@/app/v1/_lib/proxy/session";
+import { isLangfuseEnabled } from "@/lib/langfuse/index";
+import { logger } from "@/lib/logger";
+import type { CostBreakdown } from "@/lib/utils/cost-calculation";
+
+function buildRequestBodySummary(session: ProxySession): Record<string, unknown> {
+  const msg = session.request.message as Record<string, unknown>;
+  return {
+    model: session.request.model,
+    messageCount: session.getMessagesLength(),
+    hasSystemPrompt: Array.isArray(msg.system) && msg.system.length > 0,
+    toolsCount: Array.isArray(msg.tools) ? msg.tools.length : 0,
+    stream: msg.stream === true,
+    maxTokens: typeof msg.max_tokens === "number" ? msg.max_tokens : undefined,
+    temperature: typeof msg.temperature === "number" ? msg.temperature : undefined,
+  };
+}
+
+function getStatusCategory(statusCode: number): string {
+  if (statusCode >= 200 && statusCode < 300) return "2xx";
+  if (statusCode >= 400 && statusCode < 500) return "4xx";
+  if (statusCode >= 500) return "5xx";
+  return `${Math.floor(statusCode / 100)}xx`;
+}
+
+/**
+ * Convert Headers to a plain record.
+ *
+ * Security note: session.headers are the CLIENT's original request headers
+ * (user -> CCH), which may include the user's own CCH auth key. These are
+ * safe to log -- the user already knows their own credentials.
+ *
+ * The upstream PROVIDER API key (outboundKey) is injected by ProxyForwarder
+ * into a separate Headers object and is NEVER present in session.headers or
+ * ctx.responseHeaders, so no redaction is needed here.
+ */
+function headersToRecord(headers: Headers): Record<string, string> {
+  const result: Record<string, string> = {};
+  headers.forEach((value, key) => {
+    result[key] = value;
+  });
+  return result;
+}
+
+const SUCCESS_REASONS = new Set([
+  "request_success",
+  "retry_success",
+  "initial_selection",
+  "session_reuse",
+]);
+
+function isSuccessReason(reason: string | undefined): boolean {
+  return !!reason && SUCCESS_REASONS.has(reason);
+}
+
+const ERROR_REASONS = new Set([
+  "system_error",
+  "vendor_type_all_timeout",
+  "endpoint_pool_exhausted",
+]);
+
+function isErrorReason(reason: string | undefined): boolean {
+  return !!reason && ERROR_REASONS.has(reason);
+}
+
+type ObservationLevel = "DEBUG" | "DEFAULT" | "WARNING" | "ERROR";
+
+export interface TraceContext {
+  session: ProxySession;
+  responseHeaders: Headers;
+  durationMs: number;
+  statusCode: number;
+  responseText?: string;
+  isStreaming: boolean;
+  sseEventCount?: number;
+  errorMessage?: string;
+  usageMetrics?: UsageMetrics | null;
+  costUsd?: string;
+  costBreakdown?: CostBreakdown;
+}
+
+/**
+ * Send a trace to Langfuse for a completed proxy request.
+ * Fully async and non-blocking. Errors are caught and logged.
+ */
+export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
+  if (!isLangfuseEnabled()) {
+    return;
+  }
+
+  try {
+    const { startObservation, propagateAttributes } = await import("@langfuse/tracing");
+
+    const { session, durationMs, statusCode, isStreaming } = ctx;
+    const provider = session.provider;
+    const messageContext = session.messageContext;
+
+    // Compute actual request timing from session data
+    const requestStartTime = new Date(session.startTime);
+    const requestEndTime = new Date(session.startTime + durationMs);
+
+    // Compute timing breakdown from forwardStartTime
+    const forwardStartDate = session.forwardStartTime ? new Date(session.forwardStartTime) : null;
+    const guardPipelineMs = session.forwardStartTime
+      ? session.forwardStartTime - session.startTime
+      : null;
+
+    const timingBreakdown = {
+      guardPipelineMs,
+      upstreamTotalMs:
+        guardPipelineMs != null ? Math.max(0, durationMs - guardPipelineMs) : durationMs,
+      ttfbFromForwardMs:
+        guardPipelineMs != null && session.ttfbMs != null
+          ? Math.max(0, session.ttfbMs - guardPipelineMs)
+          : null,
+      tokenGenerationMs: session.ttfbMs != null ? Math.max(0, durationMs - session.ttfbMs) : null,
+      failedAttempts: session.getProviderChain().filter((i) => !isSuccessReason(i.reason)).length,
+      providersAttempted: new Set(session.getProviderChain().map((i) => i.id)).size,
+    };
+
+    // Compute observation level for root span
+    let rootSpanLevel: ObservationLevel = "DEFAULT";
+    if (statusCode < 200 || statusCode >= 300) {
+      rootSpanLevel = "ERROR";
+    } else {
+      const failedAttempts = session
+        .getProviderChain()
+        .filter((i) => !isSuccessReason(i.reason)).length;
+      if (failedAttempts >= 1) rootSpanLevel = "WARNING";
+    }
+
+    // Actual request body (forwarded to upstream after all preprocessing) - no truncation
+    const actualRequestBody = session.forwardedRequestBody
+      ? tryParseJsonSafe(session.forwardedRequestBody)
+      : session.request.message;
+
+    // Actual response body - no truncation
+    const actualResponseBody = ctx.responseText
+      ? tryParseJsonSafe(ctx.responseText)
+      : isStreaming
+        ? { streaming: true, sseEventCount: ctx.sseEventCount }
+        : { statusCode };
+
+    // Root span metadata (former input/output summaries moved here)
+    const rootSpanMetadata: Record<string, unknown> = {
+      endpoint: session.getEndpoint(),
+      method: session.method,
+      model: session.getCurrentModel(),
+      clientFormat: session.originalFormat,
+      providerName: provider?.name,
+      statusCode,
+      durationMs,
+      hasUsage: !!ctx.usageMetrics,
+      costUsd: ctx.costUsd,
+      timingBreakdown,
+    };
+
+    // Build tags - include provider name and model
+    const tags: string[] = [];
+    if (provider?.providerType) tags.push(provider.providerType);
+    if (provider?.name) tags.push(provider.name);
+    if (session.originalFormat) tags.push(session.originalFormat);
+    if (session.getCurrentModel()) tags.push(session.getCurrentModel()!);
+    tags.push(getStatusCategory(statusCode));
+
+    // Build trace-level metadata (propagateAttributes requires all values to be strings)
+    const traceMetadata: Record<string, string> = {
+      keyName: messageContext?.key?.name ?? "",
+      endpoint: session.getEndpoint() ?? "",
+      method: session.method,
+      clientFormat: session.originalFormat,
+      userAgent: session.userAgent ?? "",
+      requestSequence: String(session.getRequestSequence()),
+    };
+
+    // Build generation metadata - all request detail fields, raw headers (no redaction)
+    const generationMetadata: Record<string, unknown> = {
+      // Provider
+      providerId: provider?.id,
+      providerName: provider?.name,
+      providerType: provider?.providerType,
+      providerChain: session.getProviderChain(),
+      // Model
+      model: session.getCurrentModel(),
+      originalModel: session.getOriginalModel(),
+      modelRedirected: session.isModelRedirected(),
+      // Special settings
+      specialSettings: session.getSpecialSettings(),
+      // Request context
+      endpoint: session.getEndpoint(),
+      method: session.method,
+      clientFormat: session.originalFormat,
+      userAgent: session.userAgent,
+      requestSequence: session.getRequestSequence(),
+      sessionId: session.sessionId,
+      keyName: messageContext?.key?.name,
+      // Timing
+      durationMs,
+      ttfbMs: session.ttfbMs,
+      timingBreakdown,
+      // Flags
+      isStreaming,
+      cacheTtlApplied: session.getCacheTtlResolved(),
+      context1mApplied: session.getContext1mApplied(),
+      // Error
+      errorMessage: ctx.errorMessage,
+      // Request summary (quick overview)
+      requestSummary: buildRequestBodySummary(session),
+      // SSE
+      sseEventCount: ctx.sseEventCount,
+      // Headers (raw, no redaction)
+      requestHeaders: headersToRecord(session.headers),
+      responseHeaders: headersToRecord(ctx.responseHeaders),
+    };
+
+    // Build usage details for Langfuse generation
+    const usageDetails: Record<string, number> | undefined = ctx.usageMetrics
+      ? {
+          ...(ctx.usageMetrics.input_tokens != null
+            ? { input: ctx.usageMetrics.input_tokens }
+            : {}),
+          ...(ctx.usageMetrics.output_tokens != null
+            ? { output: ctx.usageMetrics.output_tokens }
+            : {}),
+          ...(ctx.usageMetrics.cache_read_input_tokens != null
+            ? { cache_read_input_tokens: ctx.usageMetrics.cache_read_input_tokens }
+            : {}),
+          ...(ctx.usageMetrics.cache_creation_input_tokens != null
+            ? { cache_creation_input_tokens: ctx.usageMetrics.cache_creation_input_tokens }
+            : {}),
+        }
+      : undefined;
+
+    // Build cost details (prefer breakdown, fallback to total-only)
+    const costDetails: Record<string, number> | undefined = ctx.costBreakdown
+      ? { ...ctx.costBreakdown }
+      : ctx.costUsd && Number.parseFloat(ctx.costUsd) > 0
+        ? { total: Number.parseFloat(ctx.costUsd) }
+        : undefined;
+
+    // Create the root trace span with actual bodies, level, and metadata
+    const rootSpan = startObservation(
+      "proxy-request",
+      {
+        input: actualRequestBody,
+        output: actualResponseBody,
+        level: rootSpanLevel,
+        metadata: rootSpanMetadata,
+      },
+      {
+        startTime: requestStartTime,
+      }
+    );
+
+    // Propagate trace attributes
+    await propagateAttributes(
+      {
+        userId: messageContext?.user?.name ?? undefined,
+        sessionId: session.sessionId ?? undefined,
+        tags,
+        metadata: traceMetadata,
+        traceName: `${session.method} ${session.getEndpoint() ?? "/"}`,
+      },
+      async () => {
+        // 1. Guard pipeline span (if forwardStartTime was recorded)
+        if (forwardStartDate) {
+          const guardSpan = rootSpan.startObservation(
+            "guard-pipeline",
+            {
+              output: { durationMs: guardPipelineMs, passed: true },
+            },
+            { startTime: requestStartTime } as Record<string, unknown>
+          );
+          guardSpan.end(forwardStartDate);
+        }
+
+        // 2. Provider attempt events (one per failed chain item)
+        for (const item of session.getProviderChain()) {
+          if (!isSuccessReason(item.reason)) {
+            const eventObs = rootSpan.startObservation(
+              "provider-attempt",
+              {
+                level: isErrorReason(item.reason) ? "ERROR" : "WARNING",
+                input: {
+                  providerId: item.id,
+                  providerName: item.name,
+                  attempt: item.attemptNumber,
+                },
+                output: {
+                  reason: item.reason,
+                  errorMessage: item.errorMessage,
+                  statusCode: item.statusCode,
+                },
+                metadata: { ...item },
+              },
+              {
+                asType: "event",
+                startTime: new Date(item.timestamp ?? session.startTime),
+              } as { asType: "event" }
+            );
+            eventObs.end();
+          }
+        }
+
+        // 3. LLM generation (startTime = forwardStartTime when available)
+        const generationStartTime = forwardStartDate ?? requestStartTime;
+
+        // Generation input/output = raw payload, no truncation
+        const generationInput = actualRequestBody;
+        const generationOutput = ctx.responseText
+          ? tryParseJsonSafe(ctx.responseText)
+          : isStreaming
+            ? { streaming: true, sseEventCount: ctx.sseEventCount }
+            : { statusCode };
+
+        // Create the LLM generation observation
+        const generation = rootSpan.startObservation(
+          "llm-call",
+          {
+            model: session.getCurrentModel() ?? undefined,
+            input: generationInput,
+            output: generationOutput,
+            ...(usageDetails && Object.keys(usageDetails).length > 0 ? { usageDetails } : {}),
+            ...(costDetails ? { costDetails } : {}),
+            metadata: generationMetadata,
+          },
+          // SDK runtime supports startTime on child observations but types don't expose it
+          { asType: "generation", startTime: generationStartTime } as { asType: "generation" }
+        );
+
+        // Set TTFB as completionStartTime
+        if (session.ttfbMs != null) {
+          generation.update({
+            completionStartTime: new Date(session.startTime + session.ttfbMs),
+          });
+        }
+
+        generation.end(requestEndTime);
+      }
+    );
+
+    // Explicitly set trace-level input/output (propagateAttributes does not support these)
+    rootSpan.updateTrace({
+      input: actualRequestBody,
+      output: actualResponseBody,
+    });
+
+    rootSpan.end(requestEndTime);
+  } catch (error) {
+    logger.warn("[Langfuse] Failed to trace proxy request", {
+      error: error instanceof Error ? error.message : String(error),
+    });
+  }
+}
+
+function tryParseJsonSafe(text: string): unknown {
+  try {
+    return JSON.parse(text);
+  } catch {
+    return text;
+  }
+}

+ 208 - 0
src/lib/utils/cost-calculation.ts

@@ -98,6 +98,214 @@ function calculateTieredCostWithSeparatePrices(
   return baseCost.add(premiumCost);
 }
 
+export interface CostBreakdown {
+  input: number;
+  output: number;
+  cache_creation: number;
+  cache_read: number;
+  total: number;
+}
+
+/**
+ * Calculate cost breakdown by category (always raw cost, multiplier=1.0).
+ * Returns per-category costs as plain numbers.
+ */
+export function calculateRequestCostBreakdown(
+  usage: UsageMetrics,
+  priceData: ModelPriceData,
+  context1mApplied: boolean = false
+): CostBreakdown {
+  let inputBucket = new Decimal(0);
+  let outputBucket = new Decimal(0);
+  let cacheCreationBucket = new Decimal(0);
+  let cacheReadBucket = new Decimal(0);
+
+  const inputCostPerToken = priceData.input_cost_per_token;
+  const outputCostPerToken = priceData.output_cost_per_token;
+  const inputCostPerRequest = priceData.input_cost_per_request;
+
+  // Per-request cost -> input bucket
+  if (
+    typeof inputCostPerRequest === "number" &&
+    Number.isFinite(inputCostPerRequest) &&
+    inputCostPerRequest >= 0
+  ) {
+    const requestCost = toDecimal(inputCostPerRequest);
+    if (requestCost) {
+      inputBucket = inputBucket.add(requestCost);
+    }
+  }
+
+  const cacheCreation5mCost =
+    priceData.cache_creation_input_token_cost ??
+    (inputCostPerToken != null ? inputCostPerToken * 1.25 : undefined);
+
+  const cacheCreation1hCost =
+    priceData.cache_creation_input_token_cost_above_1hr ??
+    (inputCostPerToken != null ? inputCostPerToken * 2 : undefined) ??
+    cacheCreation5mCost;
+
+  const cacheReadCost =
+    priceData.cache_read_input_token_cost ??
+    (inputCostPerToken != null
+      ? inputCostPerToken * 0.1
+      : outputCostPerToken != null
+        ? outputCostPerToken * 0.1
+        : undefined);
+
+  // Derive cache creation tokens by TTL
+  let cache5mTokens = usage.cache_creation_5m_input_tokens;
+  let cache1hTokens = usage.cache_creation_1h_input_tokens;
+
+  if (typeof usage.cache_creation_input_tokens === "number") {
+    const remaining =
+      usage.cache_creation_input_tokens - (cache5mTokens ?? 0) - (cache1hTokens ?? 0);
+
+    if (remaining > 0) {
+      const target = usage.cache_ttl === "1h" ? "1h" : "5m";
+      if (target === "1h") {
+        cache1hTokens = (cache1hTokens ?? 0) + remaining;
+      } else {
+        cache5mTokens = (cache5mTokens ?? 0) + remaining;
+      }
+    }
+  }
+
+  const inputAbove200k = priceData.input_cost_per_token_above_200k_tokens;
+  const outputAbove200k = priceData.output_cost_per_token_above_200k_tokens;
+
+  // Input tokens -> input bucket
+  if (context1mApplied && inputCostPerToken != null && usage.input_tokens != null) {
+    inputBucket = inputBucket.add(
+      calculateTieredCost(
+        usage.input_tokens,
+        inputCostPerToken,
+        CONTEXT_1M_INPUT_PREMIUM_MULTIPLIER
+      )
+    );
+  } else if (inputAbove200k != null && inputCostPerToken != null && usage.input_tokens != null) {
+    inputBucket = inputBucket.add(
+      calculateTieredCostWithSeparatePrices(usage.input_tokens, inputCostPerToken, inputAbove200k)
+    );
+  } else {
+    inputBucket = inputBucket.add(multiplyCost(usage.input_tokens, inputCostPerToken));
+  }
+
+  // Output tokens -> output bucket
+  if (context1mApplied && outputCostPerToken != null && usage.output_tokens != null) {
+    outputBucket = outputBucket.add(
+      calculateTieredCost(
+        usage.output_tokens,
+        outputCostPerToken,
+        CONTEXT_1M_OUTPUT_PREMIUM_MULTIPLIER
+      )
+    );
+  } else if (outputAbove200k != null && outputCostPerToken != null && usage.output_tokens != null) {
+    outputBucket = outputBucket.add(
+      calculateTieredCostWithSeparatePrices(
+        usage.output_tokens,
+        outputCostPerToken,
+        outputAbove200k
+      )
+    );
+  } else {
+    outputBucket = outputBucket.add(multiplyCost(usage.output_tokens, outputCostPerToken));
+  }
+
+  // Cache costs
+  const cacheCreationAbove200k = priceData.cache_creation_input_token_cost_above_200k_tokens;
+  const cacheReadAbove200k = priceData.cache_read_input_token_cost_above_200k_tokens;
+  const hasRealCacheCreationBase = priceData.cache_creation_input_token_cost != null;
+  const hasRealCacheReadBase = priceData.cache_read_input_token_cost != null;
+
+  // Cache creation 5m -> cache_creation bucket
+  if (context1mApplied && cacheCreation5mCost != null && cache5mTokens != null) {
+    cacheCreationBucket = cacheCreationBucket.add(
+      calculateTieredCost(cache5mTokens, cacheCreation5mCost, CONTEXT_1M_INPUT_PREMIUM_MULTIPLIER)
+    );
+  } else if (
+    hasRealCacheCreationBase &&
+    cacheCreationAbove200k != null &&
+    cacheCreation5mCost != null &&
+    cache5mTokens != null
+  ) {
+    cacheCreationBucket = cacheCreationBucket.add(
+      calculateTieredCostWithSeparatePrices(
+        cache5mTokens,
+        cacheCreation5mCost,
+        cacheCreationAbove200k
+      )
+    );
+  } else {
+    cacheCreationBucket = cacheCreationBucket.add(multiplyCost(cache5mTokens, cacheCreation5mCost));
+  }
+
+  // Cache creation 1h -> cache_creation bucket
+  if (context1mApplied && cacheCreation1hCost != null && cache1hTokens != null) {
+    cacheCreationBucket = cacheCreationBucket.add(
+      calculateTieredCost(cache1hTokens, cacheCreation1hCost, CONTEXT_1M_INPUT_PREMIUM_MULTIPLIER)
+    );
+  } else if (
+    hasRealCacheCreationBase &&
+    cacheCreationAbove200k != null &&
+    cacheCreation1hCost != null &&
+    cache1hTokens != null
+  ) {
+    cacheCreationBucket = cacheCreationBucket.add(
+      calculateTieredCostWithSeparatePrices(
+        cache1hTokens,
+        cacheCreation1hCost,
+        cacheCreationAbove200k
+      )
+    );
+  } else {
+    cacheCreationBucket = cacheCreationBucket.add(multiplyCost(cache1hTokens, cacheCreation1hCost));
+  }
+
+  // Cache read -> cache_read bucket
+  if (
+    hasRealCacheReadBase &&
+    cacheReadAbove200k != null &&
+    cacheReadCost != null &&
+    usage.cache_read_input_tokens != null
+  ) {
+    cacheReadBucket = cacheReadBucket.add(
+      calculateTieredCostWithSeparatePrices(
+        usage.cache_read_input_tokens,
+        cacheReadCost,
+        cacheReadAbove200k
+      )
+    );
+  } else {
+    cacheReadBucket = cacheReadBucket.add(
+      multiplyCost(usage.cache_read_input_tokens, cacheReadCost)
+    );
+  }
+
+  // Image tokens -> respective buckets
+  if (usage.output_image_tokens != null && usage.output_image_tokens > 0) {
+    const imageCostPerToken =
+      priceData.output_cost_per_image_token ?? priceData.output_cost_per_token;
+    outputBucket = outputBucket.add(multiplyCost(usage.output_image_tokens, imageCostPerToken));
+  }
+
+  if (usage.input_image_tokens != null && usage.input_image_tokens > 0) {
+    const imageCostPerToken =
+      priceData.input_cost_per_image_token ?? priceData.input_cost_per_token;
+    inputBucket = inputBucket.add(multiplyCost(usage.input_image_tokens, imageCostPerToken));
+  }
+
+  const total = inputBucket.add(outputBucket).add(cacheCreationBucket).add(cacheReadBucket);
+
+  return {
+    input: inputBucket.toDecimalPlaces(COST_SCALE).toNumber(),
+    output: outputBucket.toDecimalPlaces(COST_SCALE).toNumber(),
+    cache_creation: cacheCreationBucket.toDecimalPlaces(COST_SCALE).toNumber(),
+    cache_read: cacheReadBucket.toDecimalPlaces(COST_SCALE).toNumber(),
+    total: total.toDecimalPlaces(COST_SCALE).toNumber(),
+  };
+}
+
 /**
  * 计算单次请求的费用
  * @param usage - token使用量

+ 996 - 0
tests/unit/langfuse/langfuse-trace.test.ts

@@ -0,0 +1,996 @@
+import { describe, expect, test, vi, beforeEach, afterEach } from "vitest";
+
+// Mock the langfuse modules at the top level
+const mockStartObservation = vi.fn();
+const mockPropagateAttributes = vi.fn();
+const mockSpanEnd = vi.fn();
+const mockGenerationEnd = vi.fn();
+const mockGenerationUpdate = vi.fn();
+const mockGuardSpanEnd = vi.fn();
+const mockEventEnd = vi.fn();
+
+const mockGeneration: any = {
+  update: (...args: unknown[]) => {
+    mockGenerationUpdate(...args);
+    return mockGeneration;
+  },
+  end: mockGenerationEnd,
+};
+
+const mockGuardSpan: any = {
+  end: mockGuardSpanEnd,
+};
+
+const mockEventObs: any = {
+  end: mockEventEnd,
+};
+
+const mockUpdateTrace = vi.fn();
+
+const mockRootSpan = {
+  startObservation: vi.fn(),
+  updateTrace: mockUpdateTrace,
+  end: mockSpanEnd,
+};
+
+// Default: route by observation name
+function setupDefaultStartObservation() {
+  mockRootSpan.startObservation.mockImplementation((name: string) => {
+    if (name === "guard-pipeline") return mockGuardSpan;
+    if (name === "provider-attempt") return mockEventObs;
+    return mockGeneration; // "llm-call"
+  });
+}
+
+vi.mock("@langfuse/tracing", () => ({
+  startObservation: (...args: unknown[]) => {
+    mockStartObservation(...args);
+    return mockRootSpan;
+  },
+  propagateAttributes: async (attrs: unknown, fn: () => Promise<void>) => {
+    mockPropagateAttributes(attrs);
+    await fn();
+  },
+}));
+
+vi.mock("@/lib/logger", () => ({
+  logger: {
+    info: vi.fn(),
+    warn: vi.fn(),
+    error: vi.fn(),
+    debug: vi.fn(),
+  },
+}));
+
+let langfuseEnabled = true;
+vi.mock("@/lib/langfuse/index", () => ({
+  isLangfuseEnabled: () => langfuseEnabled,
+}));
+
+function createMockSession(overrides: Record<string, unknown> = {}) {
+  const startTime = (overrides.startTime as number) ?? Date.now() - 500;
+  return {
+    startTime,
+    method: "POST",
+    headers: new Headers({
+      "content-type": "application/json",
+      "x-api-key": "test-mock-key-not-real",
+      "user-agent": "claude-code/1.0",
+    }),
+    request: {
+      message: {
+        model: "claude-sonnet-4-20250514",
+        messages: [{ role: "user", content: "Hello" }],
+        stream: true,
+        max_tokens: 4096,
+        tools: [{ name: "tool1" }],
+      },
+      model: "claude-sonnet-4-20250514",
+    },
+    originalFormat: "claude",
+    userAgent: "claude-code/1.0",
+    sessionId: "sess_abc12345_def67890",
+    provider: {
+      id: 1,
+      name: "anthropic-main",
+      providerType: "claude",
+    },
+    messageContext: {
+      id: 42,
+      user: { id: 7, name: "testuser" },
+      key: { name: "default-key" },
+    },
+    ttfbMs: 200,
+    forwardStartTime: startTime + 5,
+    forwardedRequestBody: null,
+    getEndpoint: () => "/v1/messages",
+    getRequestSequence: () => 3,
+    getMessagesLength: () => 1,
+    getCurrentModel: () => "claude-sonnet-4-20250514",
+    getOriginalModel: () => "claude-sonnet-4-20250514",
+    isModelRedirected: () => false,
+    getProviderChain: () => [
+      {
+        id: 1,
+        name: "anthropic-main",
+        providerType: "claude",
+        reason: "initial_selection",
+        timestamp: startTime + 2,
+      },
+    ],
+    getSpecialSettings: () => null,
+    getCacheTtlResolved: () => null,
+    getContext1mApplied: () => false,
+    ...overrides,
+  } as any;
+}
+
+describe("traceProxyRequest", () => {
+  beforeEach(() => {
+    vi.clearAllMocks();
+    langfuseEnabled = true;
+    setupDefaultStartObservation();
+  });
+
+  test("should not trace when Langfuse is disabled", async () => {
+    langfuseEnabled = false;
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    expect(mockStartObservation).not.toHaveBeenCalled();
+  });
+
+  test("should trace when Langfuse is enabled with actual bodies", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+    const responseBody = { content: "Hi there" };
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers({ "content-type": "application/json" }),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      responseText: JSON.stringify(responseBody),
+    });
+
+    // Root span should have actual request body as input (not summary)
+    const rootCall = mockStartObservation.mock.calls[0];
+    expect(rootCall[0]).toBe("proxy-request");
+    // Input should be the actual request message (since forwardedRequestBody is null)
+    expect(rootCall[1].input).toEqual(
+      expect.objectContaining({
+        model: "claude-sonnet-4-20250514",
+        messages: expect.any(Array),
+      })
+    );
+    // Output should be actual response body
+    expect(rootCall[1].output).toEqual(responseBody);
+    // Should have level
+    expect(rootCall[1].level).toBe("DEFAULT");
+    // Should have metadata with former summaries
+    expect(rootCall[1].metadata).toEqual(
+      expect.objectContaining({
+        endpoint: "/v1/messages",
+        method: "POST",
+        statusCode: 200,
+        durationMs: 500,
+      })
+    );
+
+    // Should have child observations
+    const callNames = mockRootSpan.startObservation.mock.calls.map((c: unknown[]) => c[0]);
+    expect(callNames).toContain("guard-pipeline");
+    expect(callNames).toContain("llm-call");
+
+    expect(mockSpanEnd).toHaveBeenCalledWith(expect.any(Date));
+    expect(mockGenerationEnd).toHaveBeenCalledWith(expect.any(Date));
+  });
+
+  test("should use actual request messages as generation input", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+    const session = createMockSession();
+
+    await traceProxyRequest({
+      session,
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      responseText: '{"content": "response"}',
+    });
+
+    // Find the llm-call invocation
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    expect(llmCall).toBeDefined();
+    expect(llmCall[1].input).toEqual(session.request.message);
+  });
+
+  test("should use actual response body as generation output", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+    const responseBody = { content: [{ type: "text", text: "Hello!" }] };
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      responseText: JSON.stringify(responseBody),
+    });
+
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    expect(llmCall[1].output).toEqual(responseBody);
+  });
+
+  test("should pass raw headers without redaction", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers({ "x-api-key": "secret-mock" }),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    const metadata = llmCall[1].metadata;
+    expect(metadata.requestHeaders["x-api-key"]).toBe("test-mock-key-not-real");
+    expect(metadata.requestHeaders["content-type"]).toBe("application/json");
+    expect(metadata.responseHeaders["x-api-key"]).toBe("secret-mock");
+  });
+
+  test("should include provider name and model in tags", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    expect(mockPropagateAttributes).toHaveBeenCalledWith(
+      expect.objectContaining({
+        userId: "testuser",
+        sessionId: "sess_abc12345_def67890",
+        tags: expect.arrayContaining([
+          "claude",
+          "anthropic-main",
+          "claude-sonnet-4-20250514",
+          "2xx",
+        ]),
+      })
+    );
+  });
+
+  test("should include usage details when provided", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      usageMetrics: {
+        input_tokens: 100,
+        output_tokens: 50,
+        cache_read_input_tokens: 20,
+      },
+      costUsd: "0.0015",
+    });
+
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    expect(llmCall[1].usageDetails).toEqual({
+      input: 100,
+      output: 50,
+      cache_read_input_tokens: 20,
+    });
+    expect(llmCall[1].costDetails).toEqual({
+      total: 0.0015,
+    });
+  });
+
+  test("should include providerChain, specialSettings, and model in metadata", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const providerChain = [
+      {
+        id: 1,
+        name: "anthropic-main",
+        providerType: "claude",
+        reason: "initial_selection",
+        timestamp: Date.now(),
+      },
+    ];
+
+    await traceProxyRequest({
+      session: createMockSession({
+        getSpecialSettings: () => ({ maxThinking: 8192 }),
+        getProviderChain: () => providerChain,
+      }),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    const metadata = llmCall[1].metadata;
+    expect(metadata.providerChain).toEqual(providerChain);
+    expect(metadata.specialSettings).toEqual({ maxThinking: 8192 });
+    expect(metadata.model).toBe("claude-sonnet-4-20250514");
+    expect(metadata.originalModel).toBe("claude-sonnet-4-20250514");
+    expect(metadata.providerName).toBe("anthropic-main");
+    expect(metadata.requestSummary).toEqual(
+      expect.objectContaining({
+        model: "claude-sonnet-4-20250514",
+        messageCount: 1,
+      })
+    );
+  });
+
+  test("should handle model redirect metadata", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession({
+        isModelRedirected: () => true,
+        getOriginalModel: () => "claude-sonnet-4-20250514",
+        getCurrentModel: () => "glm-4",
+        request: {
+          message: { model: "glm-4", messages: [] },
+          model: "glm-4",
+        },
+      }),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    expect(llmCall[1].metadata.modelRedirected).toBe(true);
+    expect(llmCall[1].metadata.originalModel).toBe("claude-sonnet-4-20250514");
+  });
+
+  test("should set completionStartTime from ttfbMs", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const startTime = Date.now() - 500;
+    await traceProxyRequest({
+      session: createMockSession({ startTime, ttfbMs: 200 }),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    expect(mockGenerationUpdate).toHaveBeenCalledWith({
+      completionStartTime: new Date(startTime + 200),
+    });
+  });
+
+  test("should pass correct startTime and endTime to observations", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const startTime = 1700000000000;
+    const durationMs = 5000;
+
+    await traceProxyRequest({
+      session: createMockSession({ startTime, forwardStartTime: startTime + 5 }),
+      responseHeaders: new Headers(),
+      durationMs,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const expectedStart = new Date(startTime);
+    const expectedEnd = new Date(startTime + durationMs);
+    const expectedForwardStart = new Date(startTime + 5);
+
+    // Root span gets startTime in options (3rd arg)
+    expect(mockStartObservation).toHaveBeenCalledWith("proxy-request", expect.any(Object), {
+      startTime: expectedStart,
+    });
+
+    // Generation gets forwardStartTime in options (3rd arg)
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    expect(llmCall[2]).toEqual({
+      asType: "generation",
+      startTime: expectedForwardStart,
+    });
+
+    // Both end() calls receive the computed endTime
+    expect(mockGenerationEnd).toHaveBeenCalledWith(expectedEnd);
+    expect(mockSpanEnd).toHaveBeenCalledWith(expectedEnd);
+  });
+
+  test("should handle errors gracefully without throwing", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    // Make startObservation throw
+    mockStartObservation.mockImplementationOnce(() => {
+      throw new Error("SDK error");
+    });
+
+    await expect(
+      traceProxyRequest({
+        session: createMockSession(),
+        responseHeaders: new Headers(),
+        durationMs: 500,
+        statusCode: 200,
+        isStreaming: false,
+      })
+    ).resolves.toBeUndefined();
+  });
+
+  test("should include correct tags for error responses", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 502,
+      isStreaming: false,
+      errorMessage: "upstream error",
+    });
+
+    expect(mockPropagateAttributes).toHaveBeenCalledWith(
+      expect.objectContaining({
+        tags: expect.arrayContaining(["5xx"]),
+      })
+    );
+  });
+
+  test("should pass large input/output without truncation", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    // Generate a large response text
+    const largeContent = "x".repeat(200_000);
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      responseText: largeContent,
+    });
+
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    const output = llmCall[1].output as string;
+    // Should be the full content, no truncation
+    expect(output).toBe(largeContent);
+    expect(output).not.toContain("...[truncated]");
+  });
+
+  test("should show streaming output with sseEventCount when no responseText", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: true,
+      sseEventCount: 42,
+    });
+
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    expect(llmCall[1].output).toEqual({
+      streaming: true,
+      sseEventCount: 42,
+    });
+  });
+
+  test("should include costUsd in root span metadata", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      costUsd: "0.05",
+    });
+
+    const rootCall = mockStartObservation.mock.calls[0];
+    expect(rootCall[1].metadata).toEqual(
+      expect.objectContaining({
+        costUsd: "0.05",
+      })
+    );
+  });
+
+  test("should set trace-level input/output via updateTrace with actual bodies", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+    const responseBody = { result: "ok" };
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      responseText: JSON.stringify(responseBody),
+      costUsd: "0.05",
+    });
+
+    expect(mockUpdateTrace).toHaveBeenCalledWith({
+      input: expect.objectContaining({
+        model: "claude-sonnet-4-20250514",
+        messages: expect.any(Array),
+      }),
+      output: responseBody,
+    });
+  });
+
+  // --- New tests for multi-span hierarchy ---
+
+  test("should create guard-pipeline span with correct timing", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const startTime = 1700000000000;
+    const forwardStartTime = startTime + 8; // 8ms guard pipeline
+
+    await traceProxyRequest({
+      session: createMockSession({ startTime, forwardStartTime }),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const guardCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "guard-pipeline"
+    );
+    expect(guardCall).toBeDefined();
+    expect(guardCall[1]).toEqual({
+      output: { durationMs: 8, passed: true },
+    });
+    expect(guardCall[2]).toEqual({ startTime: new Date(startTime) });
+
+    // Guard span should end at forwardStartTime
+    expect(mockGuardSpanEnd).toHaveBeenCalledWith(new Date(forwardStartTime));
+  });
+
+  test("should skip guard-pipeline span when forwardStartTime is null", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession({ forwardStartTime: null }),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const guardCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "guard-pipeline"
+    );
+    expect(guardCall).toBeUndefined();
+    expect(mockGuardSpanEnd).not.toHaveBeenCalled();
+  });
+
+  test("should create provider-attempt events for failed chain items", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const startTime = 1700000000000;
+    const failTimestamp = startTime + 100;
+
+    await traceProxyRequest({
+      session: createMockSession({
+        startTime,
+        getProviderChain: () => [
+          {
+            id: 1,
+            name: "provider-a",
+            providerType: "claude",
+            reason: "retry_failed",
+            errorMessage: "502 Bad Gateway",
+            statusCode: 502,
+            attemptNumber: 1,
+            timestamp: failTimestamp,
+          },
+          {
+            id: 2,
+            name: "provider-b",
+            providerType: "claude",
+            reason: "system_error",
+            errorMessage: "ECONNREFUSED",
+            timestamp: failTimestamp + 50,
+          },
+          {
+            id: 3,
+            name: "provider-c",
+            providerType: "claude",
+            reason: "request_success",
+            timestamp: failTimestamp + 200,
+          },
+        ],
+      }),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const eventCalls = mockRootSpan.startObservation.mock.calls.filter(
+      (c: unknown[]) => c[0] === "provider-attempt"
+    );
+    // 2 failed items (retry_failed + system_error), success is skipped
+    expect(eventCalls).toHaveLength(2);
+
+    // First event: retry_failed -> WARNING level
+    expect(eventCalls[0][1]).toEqual(
+      expect.objectContaining({
+        level: "WARNING",
+        input: expect.objectContaining({
+          providerId: 1,
+          providerName: "provider-a",
+          attempt: 1,
+        }),
+        output: expect.objectContaining({
+          reason: "retry_failed",
+          errorMessage: "502 Bad Gateway",
+          statusCode: 502,
+        }),
+      })
+    );
+    expect(eventCalls[0][2]).toEqual({
+      asType: "event",
+      startTime: new Date(failTimestamp),
+    });
+
+    // Second event: system_error -> ERROR level
+    expect(eventCalls[1][1].level).toBe("ERROR");
+    expect(eventCalls[1][1].output.reason).toBe("system_error");
+  });
+
+  test("should set generation startTime to forwardStartTime", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const startTime = 1700000000000;
+    const forwardStartTime = startTime + 10;
+
+    await traceProxyRequest({
+      session: createMockSession({ startTime, forwardStartTime }),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    expect(llmCall[2]).toEqual({
+      asType: "generation",
+      startTime: new Date(forwardStartTime),
+    });
+  });
+
+  test("should fall back to requestStartTime when forwardStartTime is null", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const startTime = 1700000000000;
+
+    await traceProxyRequest({
+      session: createMockSession({ startTime, forwardStartTime: null }),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    expect(llmCall[2]).toEqual({
+      asType: "generation",
+      startTime: new Date(startTime),
+    });
+  });
+
+  test("should include timingBreakdown in root span metadata and generation metadata", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const startTime = 1700000000000;
+    const forwardStartTime = startTime + 5;
+
+    await traceProxyRequest({
+      session: createMockSession({
+        startTime,
+        forwardStartTime,
+        ttfbMs: 105,
+        getProviderChain: () => [
+          { id: 1, name: "p1", reason: "retry_failed", timestamp: startTime + 50 },
+          { id: 2, name: "p2", reason: "request_success", timestamp: startTime + 100 },
+        ],
+      }),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const expectedTimingBreakdown = {
+      guardPipelineMs: 5,
+      upstreamTotalMs: 495,
+      ttfbFromForwardMs: 100, // ttfbMs(105) - guardPipelineMs(5)
+      tokenGenerationMs: 395, // durationMs(500) - ttfbMs(105)
+      failedAttempts: 1, // only retry_failed is non-success
+      providersAttempted: 2, // 2 unique provider ids
+    };
+
+    // Root span metadata should have timingBreakdown
+    const rootCall = mockStartObservation.mock.calls[0];
+    expect(rootCall[1].metadata.timingBreakdown).toEqual(expectedTimingBreakdown);
+
+    // Generation metadata should also have timingBreakdown
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    expect(llmCall[1].metadata.timingBreakdown).toEqual(expectedTimingBreakdown);
+  });
+
+  test("should not create provider-attempt events when all providers succeeded", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession({
+        getProviderChain: () => [
+          { id: 1, name: "p1", reason: "initial_selection", timestamp: Date.now() },
+          { id: 1, name: "p1", reason: "request_success", timestamp: Date.now() },
+        ],
+      }),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const eventCalls = mockRootSpan.startObservation.mock.calls.filter(
+      (c: unknown[]) => c[0] === "provider-attempt"
+    );
+    expect(eventCalls).toHaveLength(0);
+  });
+
+  // --- New tests for input/output, level, and cost breakdown ---
+
+  test("should use forwardedRequestBody as trace input when available", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const forwardedBody = JSON.stringify({
+      model: "claude-sonnet-4-20250514",
+      messages: [{ role: "user", content: "Preprocessed Hello" }],
+      stream: true,
+    });
+
+    await traceProxyRequest({
+      session: createMockSession({
+        forwardedRequestBody: forwardedBody,
+      }),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      responseText: '{"ok": true}',
+    });
+
+    // Root span input should be the forwarded body (parsed JSON)
+    const rootCall = mockStartObservation.mock.calls[0];
+    expect(rootCall[1].input).toEqual(JSON.parse(forwardedBody));
+
+    // updateTrace should also use forwarded body
+    expect(mockUpdateTrace).toHaveBeenCalledWith({
+      input: JSON.parse(forwardedBody),
+      output: { ok: true },
+    });
+  });
+
+  test("should set root span level to DEFAULT for successful request", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const rootCall = mockStartObservation.mock.calls[0];
+    expect(rootCall[1].level).toBe("DEFAULT");
+  });
+
+  test("should set root span level to WARNING when retries occurred", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const startTime = Date.now() - 500;
+    await traceProxyRequest({
+      session: createMockSession({
+        startTime,
+        getProviderChain: () => [
+          { id: 1, name: "p1", reason: "retry_failed", timestamp: startTime + 50 },
+          { id: 2, name: "p2", reason: "request_success", timestamp: startTime + 200 },
+        ],
+      }),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const rootCall = mockStartObservation.mock.calls[0];
+    expect(rootCall[1].level).toBe("WARNING");
+  });
+
+  test("should set root span level to ERROR for non-200 status", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 502,
+      isStreaming: false,
+    });
+
+    const rootCall = mockStartObservation.mock.calls[0];
+    expect(rootCall[1].level).toBe("ERROR");
+  });
+
+  test("should set root span level to ERROR for 499 client abort", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 499,
+      isStreaming: false,
+    });
+
+    const rootCall = mockStartObservation.mock.calls[0];
+    expect(rootCall[1].level).toBe("ERROR");
+  });
+
+  test("should include cost breakdown in costDetails when provided", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const costBreakdown = {
+      input: 0.001,
+      output: 0.002,
+      cache_creation: 0.0005,
+      cache_read: 0.0001,
+      total: 0.0036,
+    };
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      costUsd: "0.0036",
+      costBreakdown,
+    });
+
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    expect(llmCall[1].costDetails).toEqual(costBreakdown);
+  });
+
+  test("should fallback to total-only costDetails when no breakdown", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      costUsd: "0.05",
+    });
+
+    const llmCall = mockRootSpan.startObservation.mock.calls.find(
+      (c: unknown[]) => c[0] === "llm-call"
+    );
+    expect(llmCall[1].costDetails).toEqual({ total: 0.05 });
+  });
+
+  test("should include former summaries in root span metadata", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      responseHeaders: new Headers(),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      costUsd: "0.05",
+    });
+
+    const rootCall = mockStartObservation.mock.calls[0];
+    const metadata = rootCall[1].metadata;
+    // Former input summary fields
+    expect(metadata.endpoint).toBe("/v1/messages");
+    expect(metadata.method).toBe("POST");
+    expect(metadata.model).toBe("claude-sonnet-4-20250514");
+    expect(metadata.clientFormat).toBe("claude");
+    expect(metadata.providerName).toBe("anthropic-main");
+    // Former output summary fields
+    expect(metadata.statusCode).toBe(200);
+    expect(metadata.durationMs).toBe(500);
+    expect(metadata.costUsd).toBe("0.05");
+    expect(metadata.timingBreakdown).toBeDefined();
+  });
+});
+
+describe("isLangfuseEnabled", () => {
+  const originalPublicKey = process.env.LANGFUSE_PUBLIC_KEY;
+  const originalSecretKey = process.env.LANGFUSE_SECRET_KEY;
+
+  afterEach(() => {
+    // Restore env
+    if (originalPublicKey !== undefined) {
+      process.env.LANGFUSE_PUBLIC_KEY = originalPublicKey;
+    } else {
+      delete process.env.LANGFUSE_PUBLIC_KEY;
+    }
+    if (originalSecretKey !== undefined) {
+      process.env.LANGFUSE_SECRET_KEY = originalSecretKey;
+    } else {
+      delete process.env.LANGFUSE_SECRET_KEY;
+    }
+  });
+
+  test("should return false when env vars are not set", () => {
+    delete process.env.LANGFUSE_PUBLIC_KEY;
+    delete process.env.LANGFUSE_SECRET_KEY;
+
+    // Direct function test (not using the mock)
+    const isEnabled = !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY);
+    expect(isEnabled).toBe(false);
+  });
+
+  test("should return true when both keys are set", () => {
+    process.env.LANGFUSE_PUBLIC_KEY = "pk-lf-test-mock";
+    process.env.LANGFUSE_SECRET_KEY = "test-mock-not-real";
+
+    const isEnabled = !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY);
+    expect(isEnabled).toBe(true);
+  });
+});

+ 159 - 0
tests/unit/lib/cost-calculation-breakdown.test.ts

@@ -0,0 +1,159 @@
+import { describe, expect, test } from "vitest";
+import { calculateRequestCostBreakdown, type CostBreakdown } from "@/lib/utils/cost-calculation";
+import type { ModelPriceData } from "@/types/model-price";
+
+function makePriceData(overrides: Partial<ModelPriceData> = {}): ModelPriceData {
+  return {
+    input_cost_per_token: 0.000003, // $3/MTok
+    output_cost_per_token: 0.000015, // $15/MTok
+    cache_creation_input_token_cost: 0.00000375, // 1.25x input
+    cache_read_input_token_cost: 0.0000003, // 0.1x input
+    ...overrides,
+  };
+}
+
+describe("calculateRequestCostBreakdown", () => {
+  test("basic input + output tokens", () => {
+    const result = calculateRequestCostBreakdown(
+      { input_tokens: 1000, output_tokens: 500 },
+      makePriceData()
+    );
+
+    expect(result.input).toBeCloseTo(0.003, 6); // 1000 * 0.000003
+    expect(result.output).toBeCloseTo(0.0075, 6); // 500 * 0.000015
+    expect(result.cache_creation).toBe(0);
+    expect(result.cache_read).toBe(0);
+    expect(result.total).toBeCloseTo(0.0105, 6);
+  });
+
+  test("cache creation (5m + 1h) + cache read", () => {
+    const result = calculateRequestCostBreakdown(
+      {
+        input_tokens: 100,
+        output_tokens: 50,
+        cache_creation_5m_input_tokens: 200,
+        cache_creation_1h_input_tokens: 300,
+        cache_read_input_tokens: 1000,
+      },
+      makePriceData({
+        cache_creation_input_token_cost_above_1hr: 0.000006, // 2x input
+      })
+    );
+
+    // cache_creation = 200 * 0.00000375 + 300 * 0.000006
+    expect(result.cache_creation).toBeCloseTo(0.00255, 6);
+    // cache_read = 1000 * 0.0000003
+    expect(result.cache_read).toBeCloseTo(0.0003, 6);
+    expect(result.total).toBeCloseTo(
+      result.input + result.output + result.cache_creation + result.cache_read,
+      10
+    );
+  });
+
+  test("image tokens go to input/output buckets", () => {
+    const result = calculateRequestCostBreakdown(
+      {
+        input_tokens: 100,
+        output_tokens: 50,
+        input_image_tokens: 500,
+        output_image_tokens: 200,
+      },
+      makePriceData({
+        input_cost_per_image_token: 0.00001,
+        output_cost_per_image_token: 0.00005,
+      })
+    );
+
+    // input = 100 * 0.000003 + 500 * 0.00001
+    expect(result.input).toBeCloseTo(0.0053, 6);
+    // output = 50 * 0.000015 + 200 * 0.00005
+    expect(result.output).toBeCloseTo(0.01075, 6);
+  });
+
+  test("tiered pricing with context1mApplied", () => {
+    const result = calculateRequestCostBreakdown(
+      {
+        input_tokens: 300000, // crosses 200k threshold
+        output_tokens: 100,
+      },
+      makePriceData(),
+      true // context1mApplied
+    );
+
+    // input: 200000 * 0.000003 + 100000 * 0.000003 * 2.0 = 0.6 + 0.6 = 1.2
+    expect(result.input).toBeCloseTo(1.2, 4);
+    // output: 100 tokens, below 200k threshold
+    expect(result.output).toBeCloseTo(0.0015, 6);
+  });
+
+  test("200k tier pricing (Gemini style)", () => {
+    const result = calculateRequestCostBreakdown(
+      {
+        input_tokens: 300000, // crosses 200k threshold
+        output_tokens: 100,
+      },
+      makePriceData({
+        input_cost_per_token_above_200k_tokens: 0.000006, // 2x base for >200k
+      })
+    );
+
+    // input: 200000 * 0.000003 + 100000 * 0.000006 = 0.6 + 0.6 = 1.2
+    expect(result.input).toBeCloseTo(1.2, 4);
+  });
+
+  test("categories sum to total", () => {
+    const result = calculateRequestCostBreakdown(
+      {
+        input_tokens: 5000,
+        output_tokens: 2000,
+        cache_creation_input_tokens: 1000,
+        cache_read_input_tokens: 3000,
+      },
+      makePriceData()
+    );
+
+    const sum = result.input + result.output + result.cache_creation + result.cache_read;
+    expect(result.total).toBeCloseTo(sum, 10);
+  });
+
+  test("zero usage returns all zeros", () => {
+    const result = calculateRequestCostBreakdown({}, makePriceData());
+
+    expect(result).toEqual({
+      input: 0,
+      output: 0,
+      cache_creation: 0,
+      cache_read: 0,
+      total: 0,
+    });
+  });
+
+  test("per-request cost goes to input bucket", () => {
+    const result = calculateRequestCostBreakdown(
+      { input_tokens: 0 },
+      makePriceData({ input_cost_per_request: 0.01 })
+    );
+
+    expect(result.input).toBeCloseTo(0.01, 6);
+    expect(result.total).toBeCloseTo(0.01, 6);
+  });
+
+  test("cache_creation_input_tokens distributed by cache_ttl", () => {
+    // When only cache_creation_input_tokens is set (no 5m/1h split),
+    // it should be assigned based on cache_ttl
+    const result = calculateRequestCostBreakdown(
+      {
+        input_tokens: 0,
+        output_tokens: 0,
+        cache_creation_input_tokens: 1000,
+        cache_ttl: "1h",
+      },
+      makePriceData({
+        cache_creation_input_token_cost_above_1hr: 0.000006,
+      })
+    );
+
+    // 1000 tokens should go to 1h tier at 0.000006
+    expect(result.cache_creation).toBeCloseTo(0.006, 6);
+  });
+});

+ 1 - 0
tests/unit/proxy/proxy-handler-session-id-error.test.ts

@@ -13,6 +13,7 @@ const h = vi.hoisted(() => ({
     },
     isCountTokensRequest: () => false,
     setOriginalFormat: () => {},
+    recordForwardStart: () => {},
     messageContext: null,
     provider: null,
   } as any,