Răsfoiți Sursa

feat(observability): integrate Langfuse for enterprise-grade LLM request tracing

Add optional Langfuse observability integration that auto-enables when
LANGFUSE_PUBLIC_KEY and LANGFUSE_SECRET_KEY env vars are set. Traces
every proxy request with full context: provider chain, model, usage
metrics, cost, TTFB, headers (auth redacted), special settings, and
error details. Built on OpenTelemetry via @langfuse/otel with async
non-blocking fire-and-forget semantics and zero overhead when disabled.

Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
ding113 1 săptămână în urmă
părinte
comite
084be30245

+ 10 - 0
.env.example

@@ -128,6 +128,16 @@ FETCH_HEADERS_TIMEOUT=600000
 FETCH_BODY_TIMEOUT=600000
 MAX_RETRY_ATTEMPTS_DEFAULT=2                # 单供应商最大尝试次数(含首次调用),范围 1-10,留空使用默认值 2
 
+# Langfuse Observability (optional, auto-enabled when keys are set)
+# 功能说明:企业级 LLM 可观测性集成,自动追踪所有代理请求的完整生命周期
+# - 配置 PUBLIC_KEY 和 SECRET_KEY 后自动启用
+# - 支持 Langfuse Cloud 和自托管实例
+LANGFUSE_PUBLIC_KEY=                        # Langfuse project public key (pk-lf-...)
+LANGFUSE_SECRET_KEY=                        # Langfuse project secret key (sk-lf-...)
+LANGFUSE_BASE_URL=https://cloud.langfuse.com  # Langfuse server URL (self-hosted or cloud)
+LANGFUSE_SAMPLE_RATE=1.0                    # Trace sampling rate (0.0-1.0, default: 1.0 = 100%)
+LANGFUSE_DEBUG=false                        # Enable Langfuse debug logging
+
 # 智能探测配置
 # 功能说明:当熔断器处于 OPEN 状态时,定期探测供应商以实现更快恢复
 # - ENABLE_SMART_PROBING:是否启用智能探测(默认:false)

+ 4 - 0
package.json

@@ -44,7 +44,11 @@
     "@hono/zod-openapi": "^1",
     "@hookform/resolvers": "^5",
     "@iarna/toml": "^2.2.5",
+    "@langfuse/client": "^4.6.1",
+    "@langfuse/otel": "^4.6.1",
+    "@langfuse/tracing": "^4.6.1",
     "@lobehub/icons": "^2",
+    "@opentelemetry/sdk-node": "^0.212.0",
     "@radix-ui/react-alert-dialog": "^1",
     "@radix-ui/react-avatar": "^1",
     "@radix-ui/react-checkbox": "^1",

+ 25 - 1
src/app/v1/_lib/proxy-handler.ts

@@ -80,7 +80,31 @@ export async function handleProxyRequest(c: Context): Promise<Response> {
 
     const response = await ProxyForwarder.send(session);
     const handled = await ProxyResponseHandler.dispatch(session, response);
-    return await attachSessionIdToErrorResponse(session.sessionId, handled);
+    const finalResponse = await attachSessionIdToErrorResponse(session.sessionId, handled);
+
+    // Fire Langfuse trace asynchronously (non-blocking)
+    if (process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY && session) {
+      const traceSession = session;
+      const traceStatusCode = handled.status;
+      const isSSE = (handled.headers.get("content-type") || "").includes("text/event-stream");
+      void import("@/lib/langfuse/trace-proxy-request")
+        .then(({ traceProxyRequest }) => {
+          void traceProxyRequest({
+            session: traceSession,
+            response: handled,
+            durationMs: Date.now() - traceSession.startTime,
+            statusCode: traceStatusCode,
+            isStreaming: isSSE,
+          });
+        })
+        .catch((err) => {
+          logger.warn("[ProxyHandler] Langfuse trace failed", {
+            error: err instanceof Error ? err.message : String(err),
+          });
+        });
+    }
+
+    return finalResponse;
   } catch (error) {
     logger.error("Proxy handler error:", error);
     if (session) {

+ 19 - 0
src/instrumentation.ts

@@ -140,6 +140,15 @@ function warmupApiKeyVacuumFilter(): void {
 export async function register() {
   // 仅在服务器端执行
   if (process.env.NEXT_RUNTIME === "nodejs") {
+    // Initialize Langfuse observability (no-op if env vars not set)
+    try {
+      const { initLangfuse } = await import("@/lib/langfuse");
+      await initLangfuse();
+    } catch (error) {
+      logger.warn("[Instrumentation] Langfuse initialization failed (non-critical)", {
+        error: error instanceof Error ? error.message : String(error),
+      });
+    }
     // Skip initialization in CI environment (no DB connection needed)
     if (process.env.CI === "true") {
       logger.warn(
@@ -216,6 +225,16 @@ export async function register() {
           });
         }
 
+        // Flush Langfuse pending spans
+        try {
+          const { shutdownLangfuse } = await import("@/lib/langfuse");
+          await shutdownLangfuse();
+        } catch (error) {
+          logger.warn("[Instrumentation] Failed to shutdown Langfuse", {
+            error: error instanceof Error ? error.message : String(error),
+          });
+        }
+
         // 尽力将 message_request 的异步批量更新刷入数据库(避免终止时丢失尾部日志)
         try {
           const { stopMessageRequestWriteBuffer } = await import(

+ 7 - 0
src/lib/config/env.schema.ts

@@ -127,6 +127,13 @@ export const EnvSchema = z.object({
   FETCH_BODY_TIMEOUT: z.coerce.number().default(600_000), // 请求/响应体传输超时(默认 600 秒)
   FETCH_HEADERS_TIMEOUT: z.coerce.number().default(600_000), // 响应头接收超时(默认 600 秒)
   FETCH_CONNECT_TIMEOUT: z.coerce.number().default(30000), // TCP 连接建立超时(默认 30 秒)
+
+  // Langfuse Observability (optional, auto-enabled when keys are set)
+  LANGFUSE_PUBLIC_KEY: z.string().optional(),
+  LANGFUSE_SECRET_KEY: z.string().optional(),
+  LANGFUSE_BASE_URL: z.string().default("https://cloud.langfuse.com"),
+  LANGFUSE_SAMPLE_RATE: z.coerce.number().min(0).max(1).default(1.0),
+  LANGFUSE_DEBUG: z.string().default("false").transform(booleanTransform),
 });
 
 /**

+ 92 - 0
src/lib/langfuse/index.ts

@@ -0,0 +1,92 @@
+import type { LangfuseSpanProcessor } from "@langfuse/otel";
+
+import type { NodeSDK } from "@opentelemetry/sdk-node";
+import { logger } from "@/lib/logger";
+
+let sdk: NodeSDK | null = null;
+let spanProcessor: LangfuseSpanProcessor | null = null;
+let initialized = false;
+
+export function isLangfuseEnabled(): boolean {
+  return !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY);
+}
+
+/**
+ * Initialize Langfuse OpenTelemetry SDK.
+ * Must be called early in the process (instrumentation.ts register()).
+ * No-op if LANGFUSE_PUBLIC_KEY / LANGFUSE_SECRET_KEY are not set.
+ */
+export async function initLangfuse(): Promise<void> {
+  if (initialized || !isLangfuseEnabled()) {
+    return;
+  }
+
+  try {
+    const { NodeSDK: OtelNodeSDK } = await import("@opentelemetry/sdk-node");
+    const { LangfuseSpanProcessor: LfSpanProcessor } = await import("@langfuse/otel");
+
+    const sampleRate = Number.parseFloat(process.env.LANGFUSE_SAMPLE_RATE || "1.0");
+
+    spanProcessor = new LfSpanProcessor({
+      publicKey: process.env.LANGFUSE_PUBLIC_KEY,
+      secretKey: process.env.LANGFUSE_SECRET_KEY,
+      baseUrl: process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com",
+      // Only export spans from langfuse-sdk scope (avoid noise from other OTel instrumentations)
+      shouldExportSpan: ({ otelSpan }) => otelSpan.instrumentationScope.name === "langfuse-sdk",
+    });
+
+    const samplerConfig =
+      sampleRate < 1.0
+        ? await (async () => {
+            const { TraceIdRatioBasedSampler } = await import("@opentelemetry/sdk-trace-base");
+            return { sampler: new TraceIdRatioBasedSampler(sampleRate) };
+          })()
+        : {};
+
+    sdk = new OtelNodeSDK({
+      spanProcessors: [spanProcessor],
+      ...samplerConfig,
+    });
+
+    sdk.start();
+    initialized = true;
+
+    logger.info("[Langfuse] Observability initialized", {
+      baseUrl: process.env.LANGFUSE_BASE_URL || "https://cloud.langfuse.com",
+      sampleRate,
+      debug: process.env.LANGFUSE_DEBUG === "true",
+    });
+
+    if (process.env.LANGFUSE_DEBUG === "true") {
+      const { configureGlobalLogger, LogLevel } = await import("@langfuse/core");
+      configureGlobalLogger({ level: LogLevel.DEBUG });
+    }
+  } catch (error) {
+    logger.error("[Langfuse] Failed to initialize", {
+      error: error instanceof Error ? error.message : String(error),
+    });
+  }
+}
+
+/**
+ * Flush pending spans and shut down the SDK.
+ * Called during graceful shutdown (SIGTERM/SIGINT).
+ */
+export async function shutdownLangfuse(): Promise<void> {
+  if (!initialized || !spanProcessor) {
+    return;
+  }
+
+  try {
+    await spanProcessor.forceFlush();
+    if (sdk) {
+      await sdk.shutdown();
+    }
+    initialized = false;
+    logger.info("[Langfuse] Shutdown complete");
+  } catch (error) {
+    logger.warn("[Langfuse] Shutdown error", {
+      error: error instanceof Error ? error.message : String(error),
+    });
+  }
+}

+ 218 - 0
src/lib/langfuse/trace-proxy-request.ts

@@ -0,0 +1,218 @@
+import type { ProxySession } from "@/app/v1/_lib/proxy/session";
+import { isLangfuseEnabled } from "@/lib/langfuse/index";
+import { logger } from "@/lib/logger";
+
+// Auth-sensitive header names to redact
+const REDACTED_HEADERS = new Set([
+  "x-api-key",
+  "authorization",
+  "x-goog-api-key",
+  "anthropic-api-key",
+  "cookie",
+  "set-cookie",
+]);
+
+function sanitizeHeaders(headers: Headers): Record<string, string> {
+  const result: Record<string, string> = {};
+  headers.forEach((value, key) => {
+    result[key] = REDACTED_HEADERS.has(key.toLowerCase()) ? "[REDACTED]" : value;
+  });
+  return result;
+}
+
+function buildRequestBodySummary(session: ProxySession): Record<string, unknown> {
+  const msg = session.request.message as Record<string, unknown>;
+  return {
+    model: session.request.model,
+    messageCount: session.getMessagesLength(),
+    hasSystemPrompt: Array.isArray(msg.system) && msg.system.length > 0,
+    toolsCount: Array.isArray(msg.tools) ? msg.tools.length : 0,
+    stream: msg.stream === true,
+    maxTokens: typeof msg.max_tokens === "number" ? msg.max_tokens : undefined,
+    temperature: typeof msg.temperature === "number" ? msg.temperature : undefined,
+  };
+}
+
+function getStatusCategory(statusCode: number): string {
+  if (statusCode >= 200 && statusCode < 300) return "2xx";
+  if (statusCode >= 400 && statusCode < 500) return "4xx";
+  if (statusCode >= 500) return "5xx";
+  return `${Math.floor(statusCode / 100)}xx`;
+}
+
+export interface TraceContext {
+  session: ProxySession;
+  response: Response;
+  durationMs: number;
+  statusCode: number;
+  responseText?: string;
+  isStreaming: boolean;
+  sseEventCount?: number;
+  errorMessage?: string;
+  usageMetrics?: {
+    input_tokens?: number;
+    output_tokens?: number;
+    cache_creation_input_tokens?: number;
+    cache_read_input_tokens?: number;
+  } | null;
+  costUsd?: string;
+}
+
+/**
+ * Send a trace to Langfuse for a completed proxy request.
+ * Fully async and non-blocking. Errors are caught and logged.
+ */
+export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
+  if (!isLangfuseEnabled()) {
+    return;
+  }
+
+  try {
+    const { startObservation, propagateAttributes } = await import("@langfuse/tracing");
+
+    const { session, response, durationMs, statusCode, isStreaming } = ctx;
+    const provider = session.provider;
+    const messageContext = session.messageContext;
+
+    // Build tags
+    const tags: string[] = [];
+    if (provider?.providerType) tags.push(provider.providerType);
+    if (session.originalFormat) tags.push(session.originalFormat);
+    tags.push(getStatusCategory(statusCode));
+
+    // Build trace-level metadata (propagateAttributes requires all values to be strings)
+    const traceMetadata: Record<string, string> = {
+      keyName: messageContext?.key?.name ?? "",
+      endpoint: session.getEndpoint() ?? "",
+      method: session.method,
+      clientFormat: session.originalFormat,
+      userAgent: session.userAgent ?? "",
+      requestSequence: String(session.getRequestSequence()),
+    };
+
+    // Build generation metadata
+    const generationMetadata: Record<string, unknown> = {
+      providerId: provider?.id,
+      providerName: provider?.name,
+      providerType: provider?.providerType,
+      providerChain: session.getProviderChain(),
+      specialSettings: session.getSpecialSettings(),
+      modelRedirected: session.isModelRedirected(),
+      originalModel: session.isModelRedirected() ? session.getOriginalModel() : undefined,
+      isStreaming,
+      statusCode,
+      durationMs,
+      ttfbMs: session.ttfbMs,
+      cacheTtlApplied: session.getCacheTtlResolved(),
+      context1mApplied: session.getContext1mApplied(),
+      errorMessage: ctx.errorMessage,
+      requestHeaders: sanitizeHeaders(session.headers),
+      responseHeaders: sanitizeHeaders(response.headers),
+      requestBodySummary: buildRequestBodySummary(session),
+    };
+
+    // Add response body summary
+    if (isStreaming) {
+      generationMetadata.sseEventCount = ctx.sseEventCount;
+    } else if (ctx.responseText) {
+      generationMetadata.responseBodySummary =
+        ctx.responseText.length > 2000
+          ? `${ctx.responseText.substring(0, 2000)}...[truncated]`
+          : ctx.responseText;
+    }
+
+    // Build usage details for Langfuse generation
+    const usageDetails: Record<string, number> | undefined = ctx.usageMetrics
+      ? {
+          ...(ctx.usageMetrics.input_tokens != null
+            ? { input: ctx.usageMetrics.input_tokens }
+            : {}),
+          ...(ctx.usageMetrics.output_tokens != null
+            ? { output: ctx.usageMetrics.output_tokens }
+            : {}),
+          ...(ctx.usageMetrics.cache_read_input_tokens != null
+            ? { cacheRead: ctx.usageMetrics.cache_read_input_tokens }
+            : {}),
+          ...(ctx.usageMetrics.cache_creation_input_tokens != null
+            ? { cacheCreation: ctx.usageMetrics.cache_creation_input_tokens }
+            : {}),
+        }
+      : undefined;
+
+    // Build cost details
+    const costDetails: Record<string, number> | undefined =
+      ctx.costUsd && Number.parseFloat(ctx.costUsd) > 0
+        ? { totalUsd: Number.parseFloat(ctx.costUsd) }
+        : undefined;
+
+    // Create the root trace span
+    const rootSpan = startObservation("proxy-request", {
+      input: traceMetadata,
+      output: {
+        statusCode,
+        durationMs,
+        model: session.getCurrentModel(),
+        hasUsage: !!ctx.usageMetrics,
+      },
+    });
+
+    // Propagate trace attributes
+    await propagateAttributes(
+      {
+        userId: messageContext?.user?.id ? String(messageContext.user.id) : undefined,
+        sessionId: session.sessionId ?? undefined,
+        tags,
+        metadata: traceMetadata,
+        traceName: `${session.method} ${session.getEndpoint() ?? "/"}`,
+      },
+      async () => {
+        // Create the LLM generation observation
+        const generation = rootSpan.startObservation(
+          "llm-call",
+          {
+            model: session.getCurrentModel() ?? undefined,
+            input: buildRequestBodySummary(session),
+            output: isStreaming
+              ? { streaming: true, sseEventCount: ctx.sseEventCount }
+              : ctx.responseText
+                ? tryParseJsonSafe(ctx.responseText)
+                : { statusCode },
+            ...(usageDetails && Object.keys(usageDetails).length > 0 ? { usageDetails } : {}),
+            ...(costDetails ? { costDetails } : {}),
+            metadata: generationMetadata,
+          },
+          { asType: "generation" }
+        );
+
+        // Set TTFB as completionStartTime
+        if (session.ttfbMs != null) {
+          generation.update({
+            completionStartTime: new Date(session.startTime + session.ttfbMs),
+          });
+        }
+
+        generation.end();
+      }
+    );
+
+    rootSpan.end();
+  } catch (error) {
+    logger.warn("[Langfuse] Failed to trace proxy request", {
+      error: error instanceof Error ? error.message : String(error),
+    });
+  }
+}
+
+function tryParseJsonSafe(text: string): unknown {
+  try {
+    const parsed = JSON.parse(text);
+    // Truncate large outputs to avoid excessive data
+    const str = JSON.stringify(parsed);
+    if (str.length > 4000) {
+      return { _truncated: true, _length: str.length, _preview: str.substring(0, 2000) };
+    }
+    return parsed;
+  } catch {
+    return text.length > 2000 ? `${text.substring(0, 2000)}...[truncated]` : text;
+  }
+}

+ 339 - 0
tests/unit/langfuse/langfuse-trace.test.ts

@@ -0,0 +1,339 @@
+import { describe, expect, test, vi, beforeEach, afterEach } from "vitest";
+
+// Mock the langfuse modules at the top level
+const mockStartObservation = vi.fn();
+const mockPropagateAttributes = vi.fn();
+const mockSpanEnd = vi.fn();
+const mockGenerationEnd = vi.fn();
+const mockGenerationUpdate = vi.fn();
+
+const mockGeneration: any = {
+  update: (...args: unknown[]) => {
+    mockGenerationUpdate(...args);
+    return mockGeneration;
+  },
+  end: mockGenerationEnd,
+};
+
+const mockRootSpan = {
+  startObservation: vi.fn().mockReturnValue(mockGeneration),
+  end: mockSpanEnd,
+};
+
+vi.mock("@langfuse/tracing", () => ({
+  startObservation: (...args: unknown[]) => {
+    mockStartObservation(...args);
+    return mockRootSpan;
+  },
+  propagateAttributes: async (attrs: unknown, fn: () => Promise<void>) => {
+    mockPropagateAttributes(attrs);
+    await fn();
+  },
+}));
+
+vi.mock("@/lib/logger", () => ({
+  logger: {
+    info: vi.fn(),
+    warn: vi.fn(),
+    error: vi.fn(),
+    debug: vi.fn(),
+  },
+}));
+
+let langfuseEnabled = true;
+vi.mock("@/lib/langfuse/index", () => ({
+  isLangfuseEnabled: () => langfuseEnabled,
+}));
+
+function createMockSession(overrides: Record<string, unknown> = {}) {
+  return {
+    startTime: Date.now() - 500,
+    method: "POST",
+    headers: new Headers({
+      "content-type": "application/json",
+      "x-api-key": "test-mock-key-not-real",
+      "user-agent": "claude-code/1.0",
+    }),
+    request: {
+      message: {
+        model: "claude-sonnet-4-20250514",
+        messages: [{ role: "user", content: "Hello" }],
+        stream: true,
+        max_tokens: 4096,
+        tools: [{ name: "tool1" }],
+      },
+      model: "claude-sonnet-4-20250514",
+    },
+    originalFormat: "claude",
+    userAgent: "claude-code/1.0",
+    sessionId: "sess_abc12345_def67890",
+    provider: {
+      id: 1,
+      name: "anthropic-main",
+      providerType: "claude",
+    },
+    messageContext: {
+      id: 42,
+      user: { id: 7, name: "testuser" },
+      key: { name: "default-key" },
+    },
+    ttfbMs: 200,
+    getEndpoint: () => "/v1/messages",
+    getRequestSequence: () => 3,
+    getMessagesLength: () => 1,
+    getCurrentModel: () => "claude-sonnet-4-20250514",
+    getOriginalModel: () => "claude-sonnet-4-20250514",
+    isModelRedirected: () => false,
+    getProviderChain: () => [
+      {
+        id: 1,
+        name: "anthropic-main",
+        providerType: "claude",
+        reason: "initial_selection",
+        timestamp: Date.now(),
+      },
+    ],
+    getSpecialSettings: () => null,
+    getCacheTtlResolved: () => null,
+    getContext1mApplied: () => false,
+    ...overrides,
+  } as any;
+}
+
+describe("traceProxyRequest", () => {
+  beforeEach(() => {
+    vi.clearAllMocks();
+    langfuseEnabled = true;
+    // Re-setup return values after clearAllMocks
+    mockRootSpan.startObservation.mockReturnValue(mockGeneration);
+  });
+
+  test("should not trace when Langfuse is disabled", async () => {
+    langfuseEnabled = false;
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      response: new Response("ok", { status: 200 }),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    expect(mockStartObservation).not.toHaveBeenCalled();
+  });
+
+  test("should trace when Langfuse is enabled", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      response: new Response("ok", { status: 200 }),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    expect(mockStartObservation).toHaveBeenCalledWith(
+      "proxy-request",
+      expect.objectContaining({
+        input: expect.objectContaining({
+          endpoint: "/v1/messages",
+          method: "POST",
+          clientFormat: "claude",
+        }),
+      })
+    );
+
+    expect(mockRootSpan.startObservation).toHaveBeenCalledWith(
+      "llm-call",
+      expect.objectContaining({
+        model: "claude-sonnet-4-20250514",
+      }),
+      { asType: "generation" }
+    );
+
+    expect(mockSpanEnd).toHaveBeenCalled();
+    expect(mockGenerationEnd).toHaveBeenCalled();
+  });
+
+  test("should redact sensitive headers", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      response: new Response("ok", { status: 200 }),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const generationCall = mockRootSpan.startObservation.mock.calls[0];
+    const metadata = generationCall[1].metadata;
+    expect(metadata.requestHeaders["x-api-key"]).toBe("[REDACTED]");
+    expect(metadata.requestHeaders["content-type"]).toBe("application/json");
+  });
+
+  test("should propagate userId and sessionId", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      response: new Response("ok", { status: 200 }),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    expect(mockPropagateAttributes).toHaveBeenCalledWith(
+      expect.objectContaining({
+        userId: "7",
+        sessionId: "sess_abc12345_def67890",
+        tags: expect.arrayContaining(["claude", "2xx"]),
+      })
+    );
+  });
+
+  test("should include usage details when provided", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      response: new Response("ok", { status: 200 }),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+      usageMetrics: {
+        input_tokens: 100,
+        output_tokens: 50,
+        cache_read_input_tokens: 20,
+      },
+      costUsd: "0.0015",
+    });
+
+    const generationCall = mockRootSpan.startObservation.mock.calls[0];
+    expect(generationCall[1].usageDetails).toEqual({
+      input: 100,
+      output: 50,
+      cacheRead: 20,
+    });
+    expect(generationCall[1].costDetails).toEqual({
+      totalUsd: 0.0015,
+    });
+  });
+
+  test("should handle model redirect metadata", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession({
+        isModelRedirected: () => true,
+        getOriginalModel: () => "claude-sonnet-4-20250514",
+        getCurrentModel: () => "glm-4",
+        request: {
+          message: { model: "glm-4", messages: [] },
+          model: "glm-4",
+        },
+      }),
+      response: new Response("ok", { status: 200 }),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    const generationCall = mockRootSpan.startObservation.mock.calls[0];
+    expect(generationCall[1].metadata.modelRedirected).toBe(true);
+    expect(generationCall[1].metadata.originalModel).toBe("claude-sonnet-4-20250514");
+  });
+
+  test("should set completionStartTime from ttfbMs", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    const startTime = Date.now() - 500;
+    await traceProxyRequest({
+      session: createMockSession({ startTime, ttfbMs: 200 }),
+      response: new Response("ok", { status: 200 }),
+      durationMs: 500,
+      statusCode: 200,
+      isStreaming: false,
+    });
+
+    expect(mockGenerationUpdate).toHaveBeenCalledWith({
+      completionStartTime: new Date(startTime + 200),
+    });
+  });
+
+  test("should handle errors gracefully without throwing", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    // Make startObservation throw
+    mockStartObservation.mockImplementationOnce(() => {
+      throw new Error("SDK error");
+    });
+
+    await expect(
+      traceProxyRequest({
+        session: createMockSession(),
+        response: new Response("ok", { status: 200 }),
+        durationMs: 500,
+        statusCode: 200,
+        isStreaming: false,
+      })
+    ).resolves.toBeUndefined();
+  });
+
+  test("should include correct tags for error responses", async () => {
+    const { traceProxyRequest } = await import("@/lib/langfuse/trace-proxy-request");
+
+    await traceProxyRequest({
+      session: createMockSession(),
+      response: new Response("error", { status: 502 }),
+      durationMs: 500,
+      statusCode: 502,
+      isStreaming: false,
+      errorMessage: "upstream error",
+    });
+
+    expect(mockPropagateAttributes).toHaveBeenCalledWith(
+      expect.objectContaining({
+        tags: expect.arrayContaining(["5xx"]),
+      })
+    );
+  });
+});
+
+describe("isLangfuseEnabled", () => {
+  const originalPublicKey = process.env.LANGFUSE_PUBLIC_KEY;
+  const originalSecretKey = process.env.LANGFUSE_SECRET_KEY;
+
+  afterEach(() => {
+    // Restore env
+    if (originalPublicKey !== undefined) {
+      process.env.LANGFUSE_PUBLIC_KEY = originalPublicKey;
+    } else {
+      delete process.env.LANGFUSE_PUBLIC_KEY;
+    }
+    if (originalSecretKey !== undefined) {
+      process.env.LANGFUSE_SECRET_KEY = originalSecretKey;
+    } else {
+      delete process.env.LANGFUSE_SECRET_KEY;
+    }
+  });
+
+  test("should return false when env vars are not set", () => {
+    delete process.env.LANGFUSE_PUBLIC_KEY;
+    delete process.env.LANGFUSE_SECRET_KEY;
+
+    // Direct function test (not using the mock)
+    const isEnabled = !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY);
+    expect(isEnabled).toBe(false);
+  });
+
+  test("should return true when both keys are set", () => {
+    process.env.LANGFUSE_PUBLIC_KEY = "pk-lf-test-mock";
+    process.env.LANGFUSE_SECRET_KEY = "test-mock-not-real";
+
+    const isEnabled = !!(process.env.LANGFUSE_PUBLIC_KEY && process.env.LANGFUSE_SECRET_KEY);
+    expect(isEnabled).toBe(true);
+  });
+});