trace-proxy-request.ts 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352
  1. import type { UsageMetrics } from "@/app/v1/_lib/proxy/response-handler";
  2. import type { ProxySession } from "@/app/v1/_lib/proxy/session";
  3. import { isLangfuseEnabled } from "@/lib/langfuse/index";
  4. import { logger } from "@/lib/logger";
  5. import type { CostBreakdown } from "@/lib/utils/cost-calculation";
  6. function buildRequestBodySummary(session: ProxySession): Record<string, unknown> {
  7. const msg = session.request.message as Record<string, unknown>;
  8. return {
  9. model: session.request.model,
  10. messageCount: session.getMessagesLength(),
  11. hasSystemPrompt: Array.isArray(msg.system) && msg.system.length > 0,
  12. toolsCount: Array.isArray(msg.tools) ? msg.tools.length : 0,
  13. stream: msg.stream === true,
  14. maxTokens: typeof msg.max_tokens === "number" ? msg.max_tokens : undefined,
  15. temperature: typeof msg.temperature === "number" ? msg.temperature : undefined,
  16. };
  17. }
  18. function getStatusCategory(statusCode: number): string {
  19. if (statusCode >= 200 && statusCode < 300) return "2xx";
  20. if (statusCode >= 400 && statusCode < 500) return "4xx";
  21. if (statusCode >= 500) return "5xx";
  22. return `${Math.floor(statusCode / 100)}xx`;
  23. }
  24. function headersToRecord(headers: Headers): Record<string, string> {
  25. const result: Record<string, string> = {};
  26. headers.forEach((value, key) => {
  27. result[key] = value;
  28. });
  29. return result;
  30. }
  31. const SUCCESS_REASONS = new Set([
  32. "request_success",
  33. "retry_success",
  34. "initial_selection",
  35. "session_reuse",
  36. ]);
  37. function isSuccessReason(reason: string | undefined): boolean {
  38. return !!reason && SUCCESS_REASONS.has(reason);
  39. }
  40. const ERROR_REASONS = new Set([
  41. "system_error",
  42. "vendor_type_all_timeout",
  43. "endpoint_pool_exhausted",
  44. ]);
  45. function isErrorReason(reason: string | undefined): boolean {
  46. return !!reason && ERROR_REASONS.has(reason);
  47. }
  48. type ObservationLevel = "DEBUG" | "DEFAULT" | "WARNING" | "ERROR";
  49. export interface TraceContext {
  50. session: ProxySession;
  51. responseHeaders: Headers;
  52. durationMs: number;
  53. statusCode: number;
  54. responseText?: string;
  55. isStreaming: boolean;
  56. sseEventCount?: number;
  57. errorMessage?: string;
  58. usageMetrics?: UsageMetrics | null;
  59. costUsd?: string;
  60. costBreakdown?: CostBreakdown;
  61. }
  62. /**
  63. * Send a trace to Langfuse for a completed proxy request.
  64. * Fully async and non-blocking. Errors are caught and logged.
  65. */
  66. export async function traceProxyRequest(ctx: TraceContext): Promise<void> {
  67. if (!isLangfuseEnabled()) {
  68. return;
  69. }
  70. try {
  71. const { startObservation, propagateAttributes } = await import("@langfuse/tracing");
  72. const { session, durationMs, statusCode, isStreaming } = ctx;
  73. const provider = session.provider;
  74. const messageContext = session.messageContext;
  75. // Compute actual request timing from session data
  76. const requestStartTime = new Date(session.startTime);
  77. const requestEndTime = new Date(session.startTime + durationMs);
  78. // Compute timing breakdown from forwardStartTime
  79. const forwardStartDate = session.forwardStartTime ? new Date(session.forwardStartTime) : null;
  80. const guardPipelineMs = session.forwardStartTime
  81. ? session.forwardStartTime - session.startTime
  82. : null;
  83. const timingBreakdown = {
  84. guardPipelineMs,
  85. upstreamTotalMs:
  86. guardPipelineMs != null ? Math.max(0, durationMs - guardPipelineMs) : durationMs,
  87. ttfbFromForwardMs:
  88. guardPipelineMs != null && session.ttfbMs != null
  89. ? Math.max(0, session.ttfbMs - guardPipelineMs)
  90. : null,
  91. tokenGenerationMs: session.ttfbMs != null ? Math.max(0, durationMs - session.ttfbMs) : null,
  92. failedAttempts: session.getProviderChain().filter((i) => !isSuccessReason(i.reason)).length,
  93. providersAttempted: new Set(session.getProviderChain().map((i) => i.id)).size,
  94. };
  95. // Compute observation level for root span
  96. let rootSpanLevel: ObservationLevel = "DEFAULT";
  97. if (statusCode < 200 || statusCode >= 300) {
  98. rootSpanLevel = "ERROR";
  99. } else {
  100. const failedAttempts = session
  101. .getProviderChain()
  102. .filter((i) => !isSuccessReason(i.reason)).length;
  103. if (failedAttempts >= 1) rootSpanLevel = "WARNING";
  104. }
  105. // Actual request body (forwarded to upstream after all preprocessing) - no truncation
  106. const actualRequestBody = session.forwardedRequestBody
  107. ? tryParseJsonSafe(session.forwardedRequestBody)
  108. : session.request.message;
  109. // Actual response body - no truncation
  110. const actualResponseBody = ctx.responseText
  111. ? tryParseJsonSafe(ctx.responseText)
  112. : isStreaming
  113. ? { streaming: true, sseEventCount: ctx.sseEventCount }
  114. : { statusCode };
  115. // Root span metadata (former input/output summaries moved here)
  116. const rootSpanMetadata: Record<string, unknown> = {
  117. endpoint: session.getEndpoint(),
  118. method: session.method,
  119. model: session.getCurrentModel(),
  120. clientFormat: session.originalFormat,
  121. providerName: provider?.name,
  122. statusCode,
  123. durationMs,
  124. hasUsage: !!ctx.usageMetrics,
  125. costUsd: ctx.costUsd,
  126. timingBreakdown,
  127. };
  128. // Build tags - include provider name and model
  129. const tags: string[] = [];
  130. if (provider?.providerType) tags.push(provider.providerType);
  131. if (provider?.name) tags.push(provider.name);
  132. if (session.originalFormat) tags.push(session.originalFormat);
  133. if (session.getCurrentModel()) tags.push(session.getCurrentModel()!);
  134. tags.push(getStatusCategory(statusCode));
  135. // Build trace-level metadata (propagateAttributes requires all values to be strings)
  136. const traceMetadata: Record<string, string> = {
  137. keyName: messageContext?.key?.name ?? "",
  138. endpoint: session.getEndpoint() ?? "",
  139. method: session.method,
  140. clientFormat: session.originalFormat,
  141. userAgent: session.userAgent ?? "",
  142. requestSequence: String(session.getRequestSequence()),
  143. };
  144. // Build generation metadata - all request detail fields, raw headers (no redaction)
  145. const generationMetadata: Record<string, unknown> = {
  146. // Provider
  147. providerId: provider?.id,
  148. providerName: provider?.name,
  149. providerType: provider?.providerType,
  150. providerChain: session.getProviderChain(),
  151. // Model
  152. model: session.getCurrentModel(),
  153. originalModel: session.getOriginalModel(),
  154. modelRedirected: session.isModelRedirected(),
  155. // Special settings
  156. specialSettings: session.getSpecialSettings(),
  157. // Request context
  158. endpoint: session.getEndpoint(),
  159. method: session.method,
  160. clientFormat: session.originalFormat,
  161. userAgent: session.userAgent,
  162. requestSequence: session.getRequestSequence(),
  163. sessionId: session.sessionId,
  164. keyName: messageContext?.key?.name,
  165. // Timing
  166. durationMs,
  167. ttfbMs: session.ttfbMs,
  168. timingBreakdown,
  169. // Flags
  170. isStreaming,
  171. cacheTtlApplied: session.getCacheTtlResolved(),
  172. context1mApplied: session.getContext1mApplied(),
  173. // Error
  174. errorMessage: ctx.errorMessage,
  175. // Request summary (quick overview)
  176. requestSummary: buildRequestBodySummary(session),
  177. // SSE
  178. sseEventCount: ctx.sseEventCount,
  179. // Headers (raw, no redaction)
  180. requestHeaders: headersToRecord(session.headers),
  181. responseHeaders: headersToRecord(ctx.responseHeaders),
  182. };
  183. // Build usage details for Langfuse generation
  184. const usageDetails: Record<string, number> | undefined = ctx.usageMetrics
  185. ? {
  186. ...(ctx.usageMetrics.input_tokens != null
  187. ? { input: ctx.usageMetrics.input_tokens }
  188. : {}),
  189. ...(ctx.usageMetrics.output_tokens != null
  190. ? { output: ctx.usageMetrics.output_tokens }
  191. : {}),
  192. ...(ctx.usageMetrics.cache_read_input_tokens != null
  193. ? { cache_read_input_tokens: ctx.usageMetrics.cache_read_input_tokens }
  194. : {}),
  195. ...(ctx.usageMetrics.cache_creation_input_tokens != null
  196. ? { cache_creation_input_tokens: ctx.usageMetrics.cache_creation_input_tokens }
  197. : {}),
  198. }
  199. : undefined;
  200. // Build cost details (prefer breakdown, fallback to total-only)
  201. const costDetails: Record<string, number> | undefined = ctx.costBreakdown
  202. ? { ...ctx.costBreakdown }
  203. : ctx.costUsd && Number.parseFloat(ctx.costUsd) > 0
  204. ? { total: Number.parseFloat(ctx.costUsd) }
  205. : undefined;
  206. // Create the root trace span with actual bodies, level, and metadata
  207. const rootSpan = startObservation(
  208. "proxy-request",
  209. {
  210. input: actualRequestBody,
  211. output: actualResponseBody,
  212. level: rootSpanLevel,
  213. metadata: rootSpanMetadata,
  214. },
  215. {
  216. startTime: requestStartTime,
  217. }
  218. );
  219. // Propagate trace attributes
  220. await propagateAttributes(
  221. {
  222. userId: messageContext?.user?.name ?? undefined,
  223. sessionId: session.sessionId ?? undefined,
  224. tags,
  225. metadata: traceMetadata,
  226. traceName: `${session.method} ${session.getEndpoint() ?? "/"}`,
  227. },
  228. async () => {
  229. // 1. Guard pipeline span (if forwardStartTime was recorded)
  230. if (forwardStartDate) {
  231. const guardSpan = rootSpan.startObservation(
  232. "guard-pipeline",
  233. {
  234. output: { durationMs: guardPipelineMs, passed: true },
  235. },
  236. { startTime: requestStartTime } as Record<string, unknown>
  237. );
  238. guardSpan.end(forwardStartDate);
  239. }
  240. // 2. Provider attempt events (one per failed chain item)
  241. for (const item of session.getProviderChain()) {
  242. if (!isSuccessReason(item.reason)) {
  243. const eventObs = rootSpan.startObservation(
  244. "provider-attempt",
  245. {
  246. level: isErrorReason(item.reason) ? "ERROR" : "WARNING",
  247. input: {
  248. providerId: item.id,
  249. providerName: item.name,
  250. attempt: item.attemptNumber,
  251. },
  252. output: {
  253. reason: item.reason,
  254. errorMessage: item.errorMessage,
  255. statusCode: item.statusCode,
  256. },
  257. metadata: { ...item },
  258. },
  259. {
  260. asType: "event",
  261. startTime: new Date(item.timestamp ?? session.startTime),
  262. } as { asType: "event" }
  263. );
  264. eventObs.end();
  265. }
  266. }
  267. // 3. LLM generation (startTime = forwardStartTime when available)
  268. const generationStartTime = forwardStartDate ?? requestStartTime;
  269. // Generation input/output = raw payload, no truncation
  270. const generationInput = actualRequestBody;
  271. const generationOutput = ctx.responseText
  272. ? tryParseJsonSafe(ctx.responseText)
  273. : isStreaming
  274. ? { streaming: true, sseEventCount: ctx.sseEventCount }
  275. : { statusCode };
  276. // Create the LLM generation observation
  277. const generation = rootSpan.startObservation(
  278. "llm-call",
  279. {
  280. model: session.getCurrentModel() ?? undefined,
  281. input: generationInput,
  282. output: generationOutput,
  283. ...(usageDetails && Object.keys(usageDetails).length > 0 ? { usageDetails } : {}),
  284. ...(costDetails ? { costDetails } : {}),
  285. metadata: generationMetadata,
  286. },
  287. // SDK runtime supports startTime on child observations but types don't expose it
  288. { asType: "generation", startTime: generationStartTime } as { asType: "generation" }
  289. );
  290. // Set TTFB as completionStartTime
  291. if (session.ttfbMs != null) {
  292. generation.update({
  293. completionStartTime: new Date(session.startTime + session.ttfbMs),
  294. });
  295. }
  296. generation.end(requestEndTime);
  297. }
  298. );
  299. // Explicitly set trace-level input/output (propagateAttributes does not support these)
  300. rootSpan.updateTrace({
  301. input: actualRequestBody,
  302. output: actualResponseBody,
  303. });
  304. rootSpan.end(requestEndTime);
  305. } catch (error) {
  306. logger.warn("[Langfuse] Failed to trace proxy request", {
  307. error: error instanceof Error ? error.message : String(error),
  308. });
  309. }
  310. }
  311. function tryParseJsonSafe(text: string): unknown {
  312. try {
  313. return JSON.parse(text);
  314. } catch {
  315. return text;
  316. }
  317. }