|
@@ -11,6 +11,7 @@ import { SessionTracker } from "@/lib/session-tracker";
|
|
|
import { calculateRequestCost } from "@/lib/utils/cost-calculation";
|
|
import { calculateRequestCost } from "@/lib/utils/cost-calculation";
|
|
|
import { hasValidPriceData } from "@/lib/utils/price-data";
|
|
import { hasValidPriceData } from "@/lib/utils/price-data";
|
|
|
import { parseSSEData } from "@/lib/utils/sse";
|
|
import { parseSSEData } from "@/lib/utils/sse";
|
|
|
|
|
+import { detectUpstreamErrorFromSseOrJsonText } from "@/lib/utils/upstream-error-detection";
|
|
|
import {
|
|
import {
|
|
|
updateMessageRequestCost,
|
|
updateMessageRequestCost,
|
|
|
updateMessageRequestDetails,
|
|
updateMessageRequestDetails,
|
|
@@ -23,6 +24,7 @@ import { GeminiAdapter } from "../gemini/adapter";
|
|
|
import type { GeminiResponse } from "../gemini/types";
|
|
import type { GeminiResponse } from "../gemini/types";
|
|
|
import { isClientAbortError } from "./errors";
|
|
import { isClientAbortError } from "./errors";
|
|
|
import type { ProxySession } from "./session";
|
|
import type { ProxySession } from "./session";
|
|
|
|
|
+import { consumeDeferredStreamingFinalization } from "./stream-finalization";
|
|
|
|
|
|
|
|
export type UsageMetrics = {
|
|
export type UsageMetrics = {
|
|
|
input_tokens?: number;
|
|
input_tokens?: number;
|
|
@@ -59,6 +61,310 @@ function cleanResponseHeaders(headers: Headers): Headers {
|
|
|
return cleaned;
|
|
return cleaned;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+type FinalizeDeferredStreamingResult = {
|
|
|
|
|
+ /**
|
|
|
|
|
+ * “内部结算用”的状态码。
|
|
|
|
|
+ *
|
|
|
|
|
+ * 注意:这不会改变客户端实际收到的 HTTP 状态码(SSE 已经开始透传后无法回头改)。
|
|
|
|
|
+ * 这里的目的仅是让内部统计/熔断/会话绑定把“假 200”按失败处理。
|
|
|
|
|
+ */
|
|
|
|
|
+ effectiveStatusCode: number;
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 内部记录的错误原因(用于写入 DB/监控,帮助定位“假 200”问题)。
|
|
|
|
|
+ */
|
|
|
|
|
+ errorMessage: string | null;
|
|
|
|
|
+ /**
|
|
|
|
|
+ * 写入 DB 时用于归因的 providerId(优先使用 deferred meta 的 providerId)。
|
|
|
|
|
+ *
|
|
|
|
|
+ * 说明:对 SSE 来说,session.provider 可能在后续逻辑里被更新/覆盖;而 deferred meta 代表本次流真正对应的 provider。
|
|
|
|
|
+ * 该字段用于保证 DB 的 providerId 与 providerChain/熔断归因一致。
|
|
|
|
|
+ */
|
|
|
|
|
+ providerIdForPersistence: number | null;
|
|
|
|
|
+};
|
|
|
|
|
+
|
|
|
|
|
+/**
|
|
|
|
|
+ * 若本次 SSE 被标记为“延迟结算”,则在流结束后补齐成功/失败的最终判定。
|
|
|
|
|
+ *
|
|
|
|
|
+ * 触发条件
|
|
|
|
|
+ * - Forwarder 收到 Response 且识别为 SSE 时,会在 session 上挂载 DeferredStreamingFinalization 元信息。
|
|
|
|
|
+ * - ResponseHandler 在后台读取完整 SSE 内容后,调用本函数:
|
|
|
|
|
+ * - 如果内容看起来是上游错误 JSON(假 200),则:
|
|
|
|
|
+ * - 计入熔断器失败;
|
|
|
|
|
+ * - 不更新 session 智能绑定(避免把会话粘到坏 provider);
|
|
|
|
|
+ * - 内部状态码改为 502(只影响统计与后续重试选择,不影响本次客户端响应)。
|
|
|
|
|
+ * - 如果流正常结束且未命中错误判定,则按成功结算并更新绑定/熔断/endpoint 成功率。
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param streamEndedNormally - 必须是 reader 读到 done=true 的“自然结束”;超时/中断等异常结束由其它逻辑处理。
|
|
|
|
|
+ * @param clientAborted - 标记是否为客户端主动中断(用于内部状态码映射,避免把中断记为 200 completed)
|
|
|
|
|
+ * @param abortReason - 非自然结束时的原因码(用于内部记录/熔断归因;不会影响客户端响应)
|
|
|
|
|
+ */
|
|
|
|
|
+async function finalizeDeferredStreamingFinalizationIfNeeded(
|
|
|
|
|
+ session: ProxySession,
|
|
|
|
|
+ allContent: string,
|
|
|
|
|
+ upstreamStatusCode: number,
|
|
|
|
|
+ streamEndedNormally: boolean,
|
|
|
|
|
+ clientAborted: boolean,
|
|
|
|
|
+ abortReason?: string
|
|
|
|
|
+): Promise<FinalizeDeferredStreamingResult> {
|
|
|
|
|
+ const meta = consumeDeferredStreamingFinalization(session);
|
|
|
|
|
+ const provider = session.provider;
|
|
|
|
|
+
|
|
|
|
|
+ const providerIdForPersistence = meta?.providerId ?? provider?.id ?? null;
|
|
|
|
|
+
|
|
|
|
|
+ // 仅在“上游 HTTP=200 且流自然结束”时做“假 200”检测:
|
|
|
|
|
+ // - 非 200:HTTP 已经表明失败(无需额外启发式)
|
|
|
|
|
+ // - 非自然结束:内容可能是部分流/截断,启发式会显著提高误判风险
|
|
|
|
|
+ //
|
|
|
|
|
+ // 此处返回 `{isError:false}` 仅表示“跳过检测”,最终仍会在下面按中断/超时视为失败结算。
|
|
|
|
|
+ const shouldDetectFake200 = streamEndedNormally && upstreamStatusCode === 200;
|
|
|
|
|
+ const detected = shouldDetectFake200
|
|
|
|
|
+ ? detectUpstreamErrorFromSseOrJsonText(allContent)
|
|
|
|
|
+ : ({ isError: false } as const);
|
|
|
|
|
+
|
|
|
|
|
+ // “内部结算用”的状态码(不会改变客户端实际 HTTP 状态码)。
|
|
|
|
|
+ // - 假 200:映射为 502,确保内部统计/熔断/会话绑定把它当作失败。
|
|
|
|
|
+ // - 未自然结束:也应映射为失败(避免把中断/部分流误记为 200 completed)。
|
|
|
|
|
+ let effectiveStatusCode: number;
|
|
|
|
|
+ let errorMessage: string | null;
|
|
|
|
|
+ if (detected.isError) {
|
|
|
|
|
+ effectiveStatusCode = 502;
|
|
|
|
|
+ errorMessage = detected.code;
|
|
|
|
|
+ } else if (!streamEndedNormally) {
|
|
|
|
|
+ effectiveStatusCode = clientAborted ? 499 : 502;
|
|
|
|
|
+ errorMessage = clientAborted ? "CLIENT_ABORTED" : (abortReason ?? "STREAM_ABORTED");
|
|
|
|
|
+ } else {
|
|
|
|
|
+ effectiveStatusCode = upstreamStatusCode;
|
|
|
|
|
+ errorMessage = null;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 未启用延迟结算 / provider 缺失:
|
|
|
|
|
+ // - 只返回“内部状态码 + 错误原因”,由调用方写入统计;
|
|
|
|
|
+ // - 不在这里更新熔断/绑定(meta 缺失意味着 Forwarder 没有启用延迟结算;provider 缺失意味着无法归因)。
|
|
|
|
|
+ if (!meta || !provider) {
|
|
|
|
|
+ return { effectiveStatusCode, errorMessage, providerIdForPersistence };
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // meta 由 Forwarder 在“拿到 upstream Response 的那一刻”记录,代表真正产生本次流的 provider。
|
|
|
|
|
+ // 即使 session.provider 在之后被其它逻辑意外修改(极端情况),我们仍以 meta 为准更新:
|
|
|
|
|
+ // - provider/endpoint 熔断与统计
|
|
|
|
|
+ // - session 智能绑定
|
|
|
|
|
+ // 这样能避免把成功/失败记到错误的 provider 上。
|
|
|
|
|
+ let providerForChain = provider;
|
|
|
|
|
+ if (provider.id !== meta.providerId) {
|
|
|
|
|
+ logger.warn("[ResponseHandler] Deferred streaming meta provider mismatch", {
|
|
|
|
|
+ sessionId: session.sessionId ?? null,
|
|
|
|
|
+ metaProviderId: meta.providerId,
|
|
|
|
|
+ currentProviderId: provider.id,
|
|
|
|
|
+ canonicalProviderId: meta.providerId,
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // 尝试用 meta.providerId 找回正确的 Provider 对象,保证 providerChain 的审计数据一致
|
|
|
|
|
+ try {
|
|
|
|
|
+ const providers = await session.getProvidersSnapshot();
|
|
|
|
|
+ const resolved = providers.find((p) => p.id === meta.providerId);
|
|
|
|
|
+ if (resolved) {
|
|
|
|
|
+ providerForChain = resolved;
|
|
|
|
|
+ } else {
|
|
|
|
|
+ logger.warn("[ResponseHandler] Deferred streaming meta provider not found in snapshot", {
|
|
|
|
|
+ sessionId: session.sessionId ?? null,
|
|
|
|
|
+ metaProviderId: meta.providerId,
|
|
|
|
|
+ currentProviderId: provider.id,
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ } catch (resolveError) {
|
|
|
|
|
+ logger.warn("[ResponseHandler] Failed to resolve meta provider from snapshot", {
|
|
|
|
|
+ sessionId: session.sessionId ?? null,
|
|
|
|
|
+ metaProviderId: meta.providerId,
|
|
|
|
|
+ currentProviderId: provider.id,
|
|
|
|
|
+ error: resolveError,
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 未自然结束:不更新 session 绑定(避免把会话粘到不稳定 provider),但要避免把它误记为 200 completed。
|
|
|
|
|
+ //
|
|
|
|
|
+ // 同时,为了让故障转移/熔断能正确工作:
|
|
|
|
|
+ // - 客户端主动中断:不计入熔断器(这通常不是供应商问题)
|
|
|
|
|
+ // - 非客户端中断:计入 provider/endpoint 熔断失败(与 timeout 路径保持一致)
|
|
|
|
|
+ if (!streamEndedNormally) {
|
|
|
|
|
+ if (!clientAborted) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 动态导入:避免 proxy 模块与熔断器模块之间潜在的循环依赖。
|
|
|
|
|
+ const { recordFailure } = await import("@/lib/circuit-breaker");
|
|
|
|
|
+ await recordFailure(meta.providerId, new Error(errorMessage ?? "STREAM_ABORTED"));
|
|
|
|
|
+ } catch (cbError) {
|
|
|
|
|
+ logger.warn("[ResponseHandler] Failed to record streaming failure in circuit breaker", {
|
|
|
|
|
+ providerId: meta.providerId,
|
|
|
|
|
+ sessionId: session.sessionId ?? null,
|
|
|
|
|
+ error: cbError,
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (meta.endpointId != null) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ const { recordEndpointFailure } = await import("@/lib/endpoint-circuit-breaker");
|
|
|
|
|
+ await recordEndpointFailure(meta.endpointId, new Error(errorMessage ?? "STREAM_ABORTED"));
|
|
|
|
|
+ } catch (endpointError) {
|
|
|
|
|
+ logger.warn("[ResponseHandler] Failed to record endpoint failure (stream aborted)", {
|
|
|
|
|
+ endpointId: meta.endpointId,
|
|
|
|
|
+ providerId: meta.providerId,
|
|
|
|
|
+ sessionId: session.sessionId ?? null,
|
|
|
|
|
+ error: endpointError,
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ session.addProviderToChain(providerForChain, {
|
|
|
|
|
+ endpointId: meta.endpointId,
|
|
|
|
|
+ endpointUrl: meta.endpointUrl,
|
|
|
|
|
+ reason: "system_error",
|
|
|
|
|
+ attemptNumber: meta.attemptNumber,
|
|
|
|
|
+ statusCode: effectiveStatusCode,
|
|
|
|
|
+ errorMessage: errorMessage ?? undefined,
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ return { effectiveStatusCode, errorMessage, providerIdForPersistence };
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ if (detected.isError) {
|
|
|
|
|
+ logger.warn("[ResponseHandler] SSE completed but body indicates error (fake 200)", {
|
|
|
|
|
+ providerId: meta.providerId,
|
|
|
|
|
+ providerName: meta.providerName,
|
|
|
|
|
+ upstreamStatusCode: meta.upstreamStatusCode,
|
|
|
|
|
+ effectiveStatusCode,
|
|
|
|
|
+ code: detected.code,
|
|
|
|
|
+ detail: detected.detail ?? null,
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // 计入熔断器:让后续请求能正确触发故障转移/熔断
|
|
|
|
|
+ try {
|
|
|
|
|
+ // 动态导入:避免 proxy 模块与熔断器模块之间潜在的循环依赖。
|
|
|
|
|
+ const { recordFailure } = await import("@/lib/circuit-breaker");
|
|
|
|
|
+ await recordFailure(meta.providerId, new Error(detected.code));
|
|
|
|
|
+ } catch (cbError) {
|
|
|
|
|
+ logger.warn("[ResponseHandler] Failed to record fake-200 error in circuit breaker", {
|
|
|
|
|
+ providerId: meta.providerId,
|
|
|
|
|
+ sessionId: session.sessionId ?? null,
|
|
|
|
|
+ error: cbError,
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // endpoint 级熔断:与成功路径保持对称,避免“假 200”只影响 provider 而不影响 endpoint 健康度
|
|
|
|
|
+ if (meta.endpointId != null) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ const { recordEndpointFailure } = await import("@/lib/endpoint-circuit-breaker");
|
|
|
|
|
+ await recordEndpointFailure(meta.endpointId, new Error(detected.code));
|
|
|
|
|
+ } catch (endpointError) {
|
|
|
|
|
+ logger.warn("[ResponseHandler] Failed to record endpoint failure (fake 200)", {
|
|
|
|
|
+ endpointId: meta.endpointId,
|
|
|
|
|
+ providerId: meta.providerId,
|
|
|
|
|
+ error: endpointError,
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 记录到决策链(用于日志展示与 DB 持久化)。
|
|
|
|
|
+ // 注意:这里用 effectiveStatusCode(502)而不是 upstreamStatusCode(200),
|
|
|
|
|
+ // 以便让内部链路明确显示这是一次失败(否则会被误读为成功)。
|
|
|
|
|
+ session.addProviderToChain(providerForChain, {
|
|
|
|
|
+ endpointId: meta.endpointId,
|
|
|
|
|
+ endpointUrl: meta.endpointUrl,
|
|
|
|
|
+ reason: "retry_failed",
|
|
|
|
|
+ attemptNumber: meta.attemptNumber,
|
|
|
|
|
+ statusCode: effectiveStatusCode,
|
|
|
|
|
+ errorMessage: detected.code,
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ return { effectiveStatusCode, errorMessage, providerIdForPersistence };
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // ========== 真正成功(SSE 完整结束且未命中错误判定)==========
|
|
|
|
|
+ if (meta.endpointId != null) {
|
|
|
|
|
+ try {
|
|
|
|
|
+ const { recordEndpointSuccess } = await import("@/lib/endpoint-circuit-breaker");
|
|
|
|
|
+ await recordEndpointSuccess(meta.endpointId);
|
|
|
|
|
+ } catch (endpointError) {
|
|
|
|
|
+ logger.warn("[ResponseHandler] Failed to record endpoint success (stream)", {
|
|
|
|
|
+ endpointId: meta.endpointId,
|
|
|
|
|
+ providerId: meta.providerId,
|
|
|
|
|
+ error: endpointError,
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ try {
|
|
|
|
|
+ const { recordSuccess } = await import("@/lib/circuit-breaker");
|
|
|
|
|
+ await recordSuccess(meta.providerId);
|
|
|
|
|
+ } catch (cbError) {
|
|
|
|
|
+ logger.warn("[ResponseHandler] Failed to record streaming success in circuit breaker", {
|
|
|
|
|
+ providerId: meta.providerId,
|
|
|
|
|
+ error: cbError,
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 成功后绑定 session 到供应商(智能绑定策略)
|
|
|
|
|
+ if (session.sessionId) {
|
|
|
|
|
+ const result = await SessionManager.updateSessionBindingSmart(
|
|
|
|
|
+ session.sessionId,
|
|
|
|
|
+ meta.providerId,
|
|
|
|
|
+ meta.providerPriority,
|
|
|
|
|
+ meta.isFirstAttempt,
|
|
|
|
|
+ meta.isFailoverSuccess
|
|
|
|
|
+ );
|
|
|
|
|
+
|
|
|
|
|
+ if (result.updated) {
|
|
|
|
|
+ logger.info("[ResponseHandler] Session binding updated (stream finalized)", {
|
|
|
|
|
+ sessionId: session.sessionId,
|
|
|
|
|
+ providerId: meta.providerId,
|
|
|
|
|
+ providerName: meta.providerName,
|
|
|
|
|
+ priority: meta.providerPriority,
|
|
|
|
|
+ reason: result.reason,
|
|
|
|
|
+ details: result.details,
|
|
|
|
|
+ attemptNumber: meta.attemptNumber,
|
|
|
|
|
+ totalProvidersAttempted: meta.totalProvidersAttempted,
|
|
|
|
|
+ });
|
|
|
|
|
+ } else {
|
|
|
|
|
+ logger.debug("[ResponseHandler] Session binding not updated (stream finalized)", {
|
|
|
|
|
+ sessionId: session.sessionId,
|
|
|
|
|
+ providerId: meta.providerId,
|
|
|
|
|
+ providerName: meta.providerName,
|
|
|
|
|
+ priority: meta.providerPriority,
|
|
|
|
|
+ reason: result.reason,
|
|
|
|
|
+ details: result.details,
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ // 统一更新两个数据源(确保监控数据一致)
|
|
|
|
|
+ void SessionManager.updateSessionProvider(session.sessionId, {
|
|
|
|
|
+ providerId: meta.providerId,
|
|
|
|
|
+ providerName: meta.providerName,
|
|
|
|
|
+ }).catch((err) => {
|
|
|
|
|
+ logger.error("[ResponseHandler] Failed to update session provider info (stream)", {
|
|
|
|
|
+ error: err,
|
|
|
|
|
+ });
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ session.addProviderToChain(providerForChain, {
|
|
|
|
|
+ endpointId: meta.endpointId,
|
|
|
|
|
+ endpointUrl: meta.endpointUrl,
|
|
|
|
|
+ reason: meta.isFirstAttempt ? "request_success" : "retry_success",
|
|
|
|
|
+ attemptNumber: meta.attemptNumber,
|
|
|
|
|
+ statusCode: meta.upstreamStatusCode,
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ logger.info("[ResponseHandler] Streaming request finalized as success", {
|
|
|
|
|
+ providerId: meta.providerId,
|
|
|
|
|
+ providerName: meta.providerName,
|
|
|
|
|
+ attemptNumber: meta.attemptNumber,
|
|
|
|
|
+ totalProvidersAttempted: meta.totalProvidersAttempted,
|
|
|
|
|
+ statusCode: meta.upstreamStatusCode,
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ return { effectiveStatusCode, errorMessage, providerIdForPersistence };
|
|
|
|
|
+}
|
|
|
|
|
+
|
|
|
export class ProxyResponseHandler {
|
|
export class ProxyResponseHandler {
|
|
|
static async dispatch(session: ProxySession, response: Response): Promise<Response> {
|
|
static async dispatch(session: ProxySession, response: Response): Promise<Response> {
|
|
|
let fixedResponse = response;
|
|
let fixedResponse = response;
|
|
@@ -215,8 +521,8 @@ export class ProxyResponseHandler {
|
|
|
statusCode: statusCode,
|
|
statusCode: statusCode,
|
|
|
ttfbMs: session.ttfbMs ?? duration,
|
|
ttfbMs: session.ttfbMs ?? duration,
|
|
|
providerChain: session.getProviderChain(),
|
|
providerChain: session.getProviderChain(),
|
|
|
- model: session.getCurrentModel() ?? undefined, // ⭐ 更新重定向后的模型
|
|
|
|
|
- providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后)
|
|
|
|
|
|
|
+ model: session.getCurrentModel() ?? undefined, // 更新重定向后的模型
|
|
|
|
|
+ providerId: session.provider?.id, // 更新最终供应商ID(重试切换后)
|
|
|
context1mApplied: session.getContext1mApplied(),
|
|
context1mApplied: session.getContext1mApplied(),
|
|
|
});
|
|
});
|
|
|
const tracker = ProxyStatusTracker.getInstance();
|
|
const tracker = ProxyStatusTracker.getInstance();
|
|
@@ -371,8 +677,8 @@ export class ProxyResponseHandler {
|
|
|
cacheCreation1hInputTokens: usageMetrics?.cache_creation_1h_input_tokens,
|
|
cacheCreation1hInputTokens: usageMetrics?.cache_creation_1h_input_tokens,
|
|
|
cacheTtlApplied: usageMetrics?.cache_ttl ?? null,
|
|
cacheTtlApplied: usageMetrics?.cache_ttl ?? null,
|
|
|
providerChain: session.getProviderChain(),
|
|
providerChain: session.getProviderChain(),
|
|
|
- model: session.getCurrentModel() ?? undefined, // ⭐ 更新重定向后的模型
|
|
|
|
|
- providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后)
|
|
|
|
|
|
|
+ model: session.getCurrentModel() ?? undefined, // 更新重定向后的模型
|
|
|
|
|
+ providerId: session.provider?.id, // 更新最终供应商ID(重试切换后)
|
|
|
context1mApplied: session.getContext1mApplied(),
|
|
context1mApplied: session.getContext1mApplied(),
|
|
|
});
|
|
});
|
|
|
|
|
|
|
@@ -573,15 +879,22 @@ export class ProxyResponseHandler {
|
|
|
const reader = responseForStats.body?.getReader();
|
|
const reader = responseForStats.body?.getReader();
|
|
|
if (!reader) return;
|
|
if (!reader) return;
|
|
|
|
|
|
|
|
|
|
+ // 注意:即使 STORE_SESSION_RESPONSE_BODY=false(不写入 Redis),这里也会在内存中累积完整流内容:
|
|
|
|
|
+ // - 用于解析 usage/cost 与内部结算(例如“假 200”检测)
|
|
|
|
|
+ // 因此该开关仅影响“是否持久化”,不用于控制流式内存占用。
|
|
|
const chunks: string[] = [];
|
|
const chunks: string[] = [];
|
|
|
const decoder = new TextDecoder();
|
|
const decoder = new TextDecoder();
|
|
|
let isFirstChunk = true;
|
|
let isFirstChunk = true;
|
|
|
|
|
+ let streamEndedNormally = false;
|
|
|
|
|
|
|
|
while (true) {
|
|
while (true) {
|
|
|
if (session.clientAbortSignal?.aborted) break;
|
|
if (session.clientAbortSignal?.aborted) break;
|
|
|
|
|
|
|
|
const { done, value } = await reader.read();
|
|
const { done, value } = await reader.read();
|
|
|
- if (done) break;
|
|
|
|
|
|
|
+ if (done) {
|
|
|
|
|
+ streamEndedNormally = true;
|
|
|
|
|
+ break;
|
|
|
|
|
+ }
|
|
|
if (value) {
|
|
if (value) {
|
|
|
if (isFirstChunk) {
|
|
if (isFirstChunk) {
|
|
|
isFirstChunk = false;
|
|
isFirstChunk = false;
|
|
@@ -594,6 +907,7 @@ export class ProxyResponseHandler {
|
|
|
const flushed = decoder.decode();
|
|
const flushed = decoder.decode();
|
|
|
if (flushed) chunks.push(flushed);
|
|
if (flushed) chunks.push(flushed);
|
|
|
const allContent = chunks.join("");
|
|
const allContent = chunks.join("");
|
|
|
|
|
+ const clientAborted = session.clientAbortSignal?.aborted ?? false;
|
|
|
|
|
|
|
|
// 存储响应体到 Redis(5分钟过期)
|
|
// 存储响应体到 Redis(5分钟过期)
|
|
|
if (session.sessionId) {
|
|
if (session.sessionId) {
|
|
@@ -608,7 +922,21 @@ export class ProxyResponseHandler {
|
|
|
|
|
|
|
|
// 使用共享的统计处理方法
|
|
// 使用共享的统计处理方法
|
|
|
const duration = Date.now() - session.startTime;
|
|
const duration = Date.now() - session.startTime;
|
|
|
- await finalizeRequestStats(session, allContent, statusCode, duration);
|
|
|
|
|
|
|
+ const finalized = await finalizeDeferredStreamingFinalizationIfNeeded(
|
|
|
|
|
+ session,
|
|
|
|
|
+ allContent,
|
|
|
|
|
+ statusCode,
|
|
|
|
|
+ streamEndedNormally,
|
|
|
|
|
+ clientAborted
|
|
|
|
|
+ );
|
|
|
|
|
+ await finalizeRequestStats(
|
|
|
|
|
+ session,
|
|
|
|
|
+ allContent,
|
|
|
|
|
+ finalized.effectiveStatusCode,
|
|
|
|
|
+ duration,
|
|
|
|
|
+ finalized.errorMessage ?? undefined,
|
|
|
|
|
+ finalized.providerIdForPersistence ?? undefined
|
|
|
|
|
+ );
|
|
|
} catch (error) {
|
|
} catch (error) {
|
|
|
if (!isClientAbortError(error as Error)) {
|
|
if (!isClientAbortError(error as Error)) {
|
|
|
logger.error("[ResponseHandler] Gemini passthrough stats task failed:", error);
|
|
logger.error("[ResponseHandler] Gemini passthrough stats task failed:", error);
|
|
@@ -702,6 +1030,9 @@ export class ProxyResponseHandler {
|
|
|
const processingPromise = (async () => {
|
|
const processingPromise = (async () => {
|
|
|
const reader = internalStream.getReader();
|
|
const reader = internalStream.getReader();
|
|
|
const decoder = new TextDecoder();
|
|
const decoder = new TextDecoder();
|
|
|
|
|
+ // 注意:即使 STORE_SESSION_RESPONSE_BODY=false(不写入 Redis),这里也会在内存中累积完整流内容:
|
|
|
|
|
+ // - 用于解析 usage/cost 与内部结算(例如“假 200”检测)
|
|
|
|
|
+ // 因此该开关仅影响“是否持久化”,不用于控制流式内存占用。
|
|
|
const chunks: string[] = [];
|
|
const chunks: string[] = [];
|
|
|
let usageForCost: UsageMetrics | null = null;
|
|
let usageForCost: UsageMetrics | null = null;
|
|
|
let isFirstChunk = true; // ⭐ 标记是否为第一块数据
|
|
let isFirstChunk = true; // ⭐ 标记是否为第一块数据
|
|
@@ -779,7 +1110,24 @@ export class ProxyResponseHandler {
|
|
|
return chunks.join("");
|
|
return chunks.join("");
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
- const finalizeStream = async (allContent: string): Promise<void> => {
|
|
|
|
|
|
|
+ const finalizeStream = async (
|
|
|
|
|
+ allContent: string,
|
|
|
|
|
+ streamEndedNormally: boolean,
|
|
|
|
|
+ clientAborted: boolean,
|
|
|
|
|
+ abortReason?: string
|
|
|
|
|
+ ): Promise<void> => {
|
|
|
|
|
+ const finalized = await finalizeDeferredStreamingFinalizationIfNeeded(
|
|
|
|
|
+ session,
|
|
|
|
|
+ allContent,
|
|
|
|
|
+ statusCode,
|
|
|
|
|
+ streamEndedNormally,
|
|
|
|
|
+ clientAborted,
|
|
|
|
|
+ abortReason
|
|
|
|
|
+ );
|
|
|
|
|
+ const effectiveStatusCode = finalized.effectiveStatusCode;
|
|
|
|
|
+ const streamErrorMessage = finalized.errorMessage;
|
|
|
|
|
+ const providerIdForPersistence = finalized.providerIdForPersistence;
|
|
|
|
|
+
|
|
|
// 存储响应体到 Redis(5分钟过期)
|
|
// 存储响应体到 Redis(5分钟过期)
|
|
|
if (session.sessionId) {
|
|
if (session.sessionId) {
|
|
|
void SessionManager.storeSessionResponse(
|
|
void SessionManager.storeSessionResponse(
|
|
@@ -839,10 +1187,10 @@ export class ProxyResponseHandler {
|
|
|
await trackCostToRedis(session, usageForCost);
|
|
await trackCostToRedis(session, usageForCost);
|
|
|
|
|
|
|
|
// 更新 session 使用量到 Redis(用于实时监控)
|
|
// 更新 session 使用量到 Redis(用于实时监控)
|
|
|
- if (session.sessionId && usageForCost) {
|
|
|
|
|
|
|
+ if (session.sessionId) {
|
|
|
let costUsdStr: string | undefined;
|
|
let costUsdStr: string | undefined;
|
|
|
try {
|
|
try {
|
|
|
- if (session.request.model) {
|
|
|
|
|
|
|
+ if (usageForCost && session.request.model) {
|
|
|
const priceData = await session.getCachedPriceDataByBillingSource();
|
|
const priceData = await session.getCachedPriceDataByBillingSource();
|
|
|
if (priceData) {
|
|
if (priceData) {
|
|
|
const cost = calculateRequestCost(
|
|
const cost = calculateRequestCost(
|
|
@@ -862,22 +1210,30 @@ export class ProxyResponseHandler {
|
|
|
});
|
|
});
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
- void SessionManager.updateSessionUsage(session.sessionId, {
|
|
|
|
|
- inputTokens: usageForCost.input_tokens,
|
|
|
|
|
- outputTokens: usageForCost.output_tokens,
|
|
|
|
|
- cacheCreationInputTokens: usageForCost.cache_creation_input_tokens,
|
|
|
|
|
- cacheReadInputTokens: usageForCost.cache_read_input_tokens,
|
|
|
|
|
- costUsd: costUsdStr,
|
|
|
|
|
- status: statusCode >= 200 && statusCode < 300 ? "completed" : "error",
|
|
|
|
|
- statusCode: statusCode,
|
|
|
|
|
- }).catch((error: unknown) => {
|
|
|
|
|
- logger.error("[ResponseHandler] Failed to update session usage:", error);
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ const payload: SessionUsageUpdate = {
|
|
|
|
|
+ status: effectiveStatusCode >= 200 && effectiveStatusCode < 300 ? "completed" : "error",
|
|
|
|
|
+ statusCode: effectiveStatusCode,
|
|
|
|
|
+ ...(streamErrorMessage ? { errorMessage: streamErrorMessage } : {}),
|
|
|
|
|
+ };
|
|
|
|
|
+
|
|
|
|
|
+ if (usageForCost) {
|
|
|
|
|
+ payload.inputTokens = usageForCost.input_tokens;
|
|
|
|
|
+ payload.outputTokens = usageForCost.output_tokens;
|
|
|
|
|
+ payload.cacheCreationInputTokens = usageForCost.cache_creation_input_tokens;
|
|
|
|
|
+ payload.cacheReadInputTokens = usageForCost.cache_read_input_tokens;
|
|
|
|
|
+ payload.costUsd = costUsdStr;
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ void SessionManager.updateSessionUsage(session.sessionId, payload).catch(
|
|
|
|
|
+ (error: unknown) => {
|
|
|
|
|
+ logger.error("[ResponseHandler] Failed to update session usage:", error);
|
|
|
|
|
+ }
|
|
|
|
|
+ );
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
// 保存扩展信息(status code, tokens, provider chain)
|
|
// 保存扩展信息(status code, tokens, provider chain)
|
|
|
await updateMessageRequestDetails(messageContext.id, {
|
|
await updateMessageRequestDetails(messageContext.id, {
|
|
|
- statusCode: statusCode,
|
|
|
|
|
|
|
+ statusCode: effectiveStatusCode,
|
|
|
inputTokens: usageForCost?.input_tokens,
|
|
inputTokens: usageForCost?.input_tokens,
|
|
|
outputTokens: usageForCost?.output_tokens,
|
|
outputTokens: usageForCost?.output_tokens,
|
|
|
ttfbMs: session.ttfbMs,
|
|
ttfbMs: session.ttfbMs,
|
|
@@ -887,13 +1243,15 @@ export class ProxyResponseHandler {
|
|
|
cacheCreation1hInputTokens: usageForCost?.cache_creation_1h_input_tokens,
|
|
cacheCreation1hInputTokens: usageForCost?.cache_creation_1h_input_tokens,
|
|
|
cacheTtlApplied: usageForCost?.cache_ttl ?? null,
|
|
cacheTtlApplied: usageForCost?.cache_ttl ?? null,
|
|
|
providerChain: session.getProviderChain(),
|
|
providerChain: session.getProviderChain(),
|
|
|
- model: session.getCurrentModel() ?? undefined, // ⭐ 更新重定向后的模型
|
|
|
|
|
- providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后)
|
|
|
|
|
|
|
+ ...(streamErrorMessage ? { errorMessage: streamErrorMessage } : {}),
|
|
|
|
|
+ model: session.getCurrentModel() ?? undefined, // 更新重定向后的模型
|
|
|
|
|
+ providerId: providerIdForPersistence ?? session.provider?.id, // 更新最终供应商ID(重试切换后)
|
|
|
context1mApplied: session.getContext1mApplied(),
|
|
context1mApplied: session.getContext1mApplied(),
|
|
|
});
|
|
});
|
|
|
};
|
|
};
|
|
|
|
|
|
|
|
try {
|
|
try {
|
|
|
|
|
+ let streamEndedNormally = false;
|
|
|
while (true) {
|
|
while (true) {
|
|
|
// 检查取消信号
|
|
// 检查取消信号
|
|
|
if (session.clientAbortSignal?.aborted || abortController.signal.aborted) {
|
|
if (session.clientAbortSignal?.aborted || abortController.signal.aborted) {
|
|
@@ -907,6 +1265,7 @@ export class ProxyResponseHandler {
|
|
|
|
|
|
|
|
const { value, done } = await reader.read();
|
|
const { value, done } = await reader.read();
|
|
|
if (done) {
|
|
if (done) {
|
|
|
|
|
+ streamEndedNormally = true;
|
|
|
break;
|
|
break;
|
|
|
}
|
|
}
|
|
|
if (value) {
|
|
if (value) {
|
|
@@ -945,7 +1304,30 @@ export class ProxyResponseHandler {
|
|
|
// ⭐ 流式读取完成:清除静默期计时器
|
|
// ⭐ 流式读取完成:清除静默期计时器
|
|
|
clearIdleTimer();
|
|
clearIdleTimer();
|
|
|
const allContent = flushAndJoin();
|
|
const allContent = flushAndJoin();
|
|
|
- await finalizeStream(allContent);
|
|
|
|
|
|
|
+ const clientAborted = session.clientAbortSignal?.aborted ?? false;
|
|
|
|
|
+ try {
|
|
|
|
|
+ await finalizeStream(allContent, streamEndedNormally, clientAborted);
|
|
|
|
|
+ } catch (finalizeError) {
|
|
|
|
|
+ logger.error("ResponseHandler: Failed to finalize stream", {
|
|
|
|
|
+ taskId,
|
|
|
|
|
+ providerId: provider.id,
|
|
|
|
|
+ providerName: provider.name,
|
|
|
|
|
+ messageId: messageContext.id,
|
|
|
|
|
+ streamEndedNormally,
|
|
|
|
|
+ clientAborted,
|
|
|
|
|
+ finalizeError,
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // 回退:避免 finalizeStream 失败导致 request record 未被更新
|
|
|
|
|
+ await persistRequestFailure({
|
|
|
|
|
+ session,
|
|
|
|
|
+ messageContext,
|
|
|
|
|
+ statusCode: statusCode && statusCode >= 400 ? statusCode : 500,
|
|
|
|
|
+ error: finalizeError,
|
|
|
|
|
+ taskId,
|
|
|
|
|
+ phase: "stream",
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
} catch (error) {
|
|
} catch (error) {
|
|
|
// 检测 AbortError 的来源:响应超时 vs 静默期超时 vs 客户端/上游中断
|
|
// 检测 AbortError 的来源:响应超时 vs 静默期超时 vs 客户端/上游中断
|
|
|
const err = error as Error;
|
|
const err = error as Error;
|
|
@@ -972,32 +1354,30 @@ export class ProxyResponseHandler {
|
|
|
errorName: err.name,
|
|
errorName: err.name,
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
- // ⚠️ 计入熔断器(动态导入避免循环依赖)
|
|
|
|
|
|
|
+ // 注意:无法重试,因为客户端已收到 HTTP 200
|
|
|
|
|
+ // 错误已记录,不抛出异常(避免影响后台任务)
|
|
|
|
|
+
|
|
|
|
|
+ // 结算并消费 deferred meta,确保 provider chain/熔断归因完整
|
|
|
try {
|
|
try {
|
|
|
- const { recordFailure } = await import("@/lib/circuit-breaker");
|
|
|
|
|
- await recordFailure(provider.id, err);
|
|
|
|
|
- logger.debug("ResponseHandler: Response timeout recorded in circuit breaker", {
|
|
|
|
|
- providerId: provider.id,
|
|
|
|
|
|
|
+ const allContent = flushAndJoin();
|
|
|
|
|
+ await finalizeStream(allContent, false, false, "STREAM_RESPONSE_TIMEOUT");
|
|
|
|
|
+ } catch (finalizeError) {
|
|
|
|
|
+ logger.error("ResponseHandler: Failed to finalize response-timeout stream", {
|
|
|
|
|
+ taskId,
|
|
|
|
|
+ messageId: messageContext.id,
|
|
|
|
|
+ finalizeError,
|
|
|
});
|
|
});
|
|
|
- } catch (cbError) {
|
|
|
|
|
- logger.warn("ResponseHandler: Failed to record timeout in circuit breaker", {
|
|
|
|
|
- providerId: provider.id,
|
|
|
|
|
- error: cbError,
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // 回退:至少保证 DB 记录能落下,避免 orphan record
|
|
|
|
|
+ await persistRequestFailure({
|
|
|
|
|
+ session,
|
|
|
|
|
+ messageContext,
|
|
|
|
|
+ statusCode: statusCode && statusCode >= 400 ? statusCode : 502,
|
|
|
|
|
+ error: err,
|
|
|
|
|
+ taskId,
|
|
|
|
|
+ phase: "stream",
|
|
|
});
|
|
});
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- // 注意:无法重试,因为客户端已收到 HTTP 200
|
|
|
|
|
- // 错误已记录,熔断器已更新,不抛出异常(避免影响后台任务)
|
|
|
|
|
-
|
|
|
|
|
- // 更新数据库记录(避免 orphan record)
|
|
|
|
|
- await persistRequestFailure({
|
|
|
|
|
- session,
|
|
|
|
|
- messageContext,
|
|
|
|
|
- statusCode: statusCode && statusCode >= 400 ? statusCode : 502,
|
|
|
|
|
- error: err,
|
|
|
|
|
- taskId,
|
|
|
|
|
- phase: "stream",
|
|
|
|
|
- });
|
|
|
|
|
} else if (isIdleTimeout) {
|
|
} else if (isIdleTimeout) {
|
|
|
// ⚠️ 静默期超时:计入熔断器并记录错误日志
|
|
// ⚠️ 静默期超时:计入熔断器并记录错误日志
|
|
|
logger.error("ResponseHandler: Streaming idle timeout", {
|
|
logger.error("ResponseHandler: Streaming idle timeout", {
|
|
@@ -1008,32 +1388,30 @@ export class ProxyResponseHandler {
|
|
|
chunksCollected: chunks.length,
|
|
chunksCollected: chunks.length,
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
- // ⚠️ 计入熔断器(动态导入避免循环依赖)
|
|
|
|
|
|
|
+ // 注意:无法重试,因为客户端已收到 HTTP 200
|
|
|
|
|
+ // 错误已记录,不抛出异常(避免影响后台任务)
|
|
|
|
|
+
|
|
|
|
|
+ // 结算并消费 deferred meta,确保 provider chain/熔断归因完整
|
|
|
try {
|
|
try {
|
|
|
- const { recordFailure } = await import("@/lib/circuit-breaker");
|
|
|
|
|
- await recordFailure(provider.id, err);
|
|
|
|
|
- logger.debug("ResponseHandler: Streaming idle timeout recorded in circuit breaker", {
|
|
|
|
|
- providerId: provider.id,
|
|
|
|
|
|
|
+ const allContent = flushAndJoin();
|
|
|
|
|
+ await finalizeStream(allContent, false, false, "STREAM_IDLE_TIMEOUT");
|
|
|
|
|
+ } catch (finalizeError) {
|
|
|
|
|
+ logger.error("ResponseHandler: Failed to finalize idle-timeout stream", {
|
|
|
|
|
+ taskId,
|
|
|
|
|
+ messageId: messageContext.id,
|
|
|
|
|
+ finalizeError,
|
|
|
});
|
|
});
|
|
|
- } catch (cbError) {
|
|
|
|
|
- logger.warn("ResponseHandler: Failed to record timeout in circuit breaker", {
|
|
|
|
|
- providerId: provider.id,
|
|
|
|
|
- error: cbError,
|
|
|
|
|
|
|
+
|
|
|
|
|
+ // 回退:至少保证 DB 记录能落下,避免 orphan record
|
|
|
|
|
+ await persistRequestFailure({
|
|
|
|
|
+ session,
|
|
|
|
|
+ messageContext,
|
|
|
|
|
+ statusCode: statusCode && statusCode >= 400 ? statusCode : 502,
|
|
|
|
|
+ error: err,
|
|
|
|
|
+ taskId,
|
|
|
|
|
+ phase: "stream",
|
|
|
});
|
|
});
|
|
|
}
|
|
}
|
|
|
-
|
|
|
|
|
- // 注意:无法重试,因为客户端已收到 HTTP 200
|
|
|
|
|
- // 错误已记录,熔断器已更新,不抛出异常(避免影响后台任务)
|
|
|
|
|
-
|
|
|
|
|
- // 更新数据库记录(避免 orphan record - 这是导致 185 个孤儿记录的根本原因!)
|
|
|
|
|
- await persistRequestFailure({
|
|
|
|
|
- session,
|
|
|
|
|
- messageContext,
|
|
|
|
|
- statusCode: statusCode && statusCode >= 400 ? statusCode : 502,
|
|
|
|
|
- error: err,
|
|
|
|
|
- taskId,
|
|
|
|
|
- phase: "stream",
|
|
|
|
|
- });
|
|
|
|
|
} else if (!clientAborted) {
|
|
} else if (!clientAborted) {
|
|
|
// 上游在流式过程中意外中断:视为供应商/网络错误
|
|
// 上游在流式过程中意外中断:视为供应商/网络错误
|
|
|
logger.error("ResponseHandler: Upstream stream aborted unexpectedly", {
|
|
logger.error("ResponseHandler: Upstream stream aborted unexpectedly", {
|
|
@@ -1046,14 +1424,27 @@ export class ProxyResponseHandler {
|
|
|
errorMessage: err.message || "(empty message)",
|
|
errorMessage: err.message || "(empty message)",
|
|
|
});
|
|
});
|
|
|
|
|
|
|
|
- await persistRequestFailure({
|
|
|
|
|
- session,
|
|
|
|
|
- messageContext,
|
|
|
|
|
- statusCode: 502,
|
|
|
|
|
- error: err,
|
|
|
|
|
- taskId,
|
|
|
|
|
- phase: "stream",
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ // 结算并消费 deferred meta,确保 provider chain/熔断归因完整
|
|
|
|
|
+ try {
|
|
|
|
|
+ const allContent = flushAndJoin();
|
|
|
|
|
+ await finalizeStream(allContent, false, false, "STREAM_UPSTREAM_ABORTED");
|
|
|
|
|
+ } catch (finalizeError) {
|
|
|
|
|
+ logger.error("ResponseHandler: Failed to finalize upstream-aborted stream", {
|
|
|
|
|
+ taskId,
|
|
|
|
|
+ messageId: messageContext.id,
|
|
|
|
|
+ finalizeError,
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // 回退:至少保证 DB 记录能落下,避免 orphan record
|
|
|
|
|
+ await persistRequestFailure({
|
|
|
|
|
+ session,
|
|
|
|
|
+ messageContext,
|
|
|
|
|
+ statusCode: 502,
|
|
|
|
|
+ error: err,
|
|
|
|
|
+ taskId,
|
|
|
|
|
+ phase: "stream",
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
} else {
|
|
} else {
|
|
|
// 客户端主动中断:正常日志,不抛出错误
|
|
// 客户端主动中断:正常日志,不抛出错误
|
|
|
logger.warn("ResponseHandler: Stream reading aborted by client", {
|
|
logger.warn("ResponseHandler: Stream reading aborted by client", {
|
|
@@ -1070,7 +1461,7 @@ export class ProxyResponseHandler {
|
|
|
});
|
|
});
|
|
|
try {
|
|
try {
|
|
|
const allContent = flushAndJoin();
|
|
const allContent = flushAndJoin();
|
|
|
- await finalizeStream(allContent);
|
|
|
|
|
|
|
+ await finalizeStream(allContent, false, true);
|
|
|
} catch (finalizeError) {
|
|
} catch (finalizeError) {
|
|
|
logger.error("ResponseHandler: Failed to finalize aborted stream response", {
|
|
logger.error("ResponseHandler: Failed to finalize aborted stream response", {
|
|
|
taskId,
|
|
taskId,
|
|
@@ -1082,15 +1473,27 @@ export class ProxyResponseHandler {
|
|
|
} else {
|
|
} else {
|
|
|
logger.error("Failed to save SSE content:", error);
|
|
logger.error("Failed to save SSE content:", error);
|
|
|
|
|
|
|
|
- // 更新数据库记录(避免 orphan record)
|
|
|
|
|
- await persistRequestFailure({
|
|
|
|
|
- session,
|
|
|
|
|
- messageContext,
|
|
|
|
|
- statusCode: statusCode && statusCode >= 400 ? statusCode : 500,
|
|
|
|
|
- error,
|
|
|
|
|
- taskId,
|
|
|
|
|
- phase: "stream",
|
|
|
|
|
- });
|
|
|
|
|
|
|
+ // 结算并消费 deferred meta,确保 provider chain/熔断归因完整
|
|
|
|
|
+ try {
|
|
|
|
|
+ const allContent = flushAndJoin();
|
|
|
|
|
+ await finalizeStream(allContent, false, clientAborted, "STREAM_PROCESSING_ERROR");
|
|
|
|
|
+ } catch (finalizeError) {
|
|
|
|
|
+ logger.error("ResponseHandler: Failed to finalize stream after processing error", {
|
|
|
|
|
+ taskId,
|
|
|
|
|
+ messageId: messageContext.id,
|
|
|
|
|
+ finalizeError,
|
|
|
|
|
+ });
|
|
|
|
|
+
|
|
|
|
|
+ // 回退:至少保证 DB 记录能落下,避免 orphan record
|
|
|
|
|
+ await persistRequestFailure({
|
|
|
|
|
+ session,
|
|
|
|
|
+ messageContext,
|
|
|
|
|
+ statusCode: statusCode && statusCode >= 400 ? statusCode : 500,
|
|
|
|
|
+ error,
|
|
|
|
|
+ taskId,
|
|
|
|
|
+ phase: "stream",
|
|
|
|
|
+ });
|
|
|
|
|
+ }
|
|
|
}
|
|
}
|
|
|
} finally {
|
|
} finally {
|
|
|
// 确保资源释放
|
|
// 确保资源释放
|
|
@@ -1795,18 +2198,25 @@ async function updateRequestCostFromUsage(
|
|
|
/**
|
|
/**
|
|
|
* 统一的请求统计处理方法
|
|
* 统一的请求统计处理方法
|
|
|
* 用于消除 Gemini 透传、普通非流式、普通流式之间的重复统计逻辑
|
|
* 用于消除 Gemini 透传、普通非流式、普通流式之间的重复统计逻辑
|
|
|
|
|
+ *
|
|
|
|
|
+ * @param statusCode - 内部结算状态码(可能与客户端实际收到的 HTTP 状态不同,例如“假 200”会被映射为 502)
|
|
|
|
|
+ * @param errorMessage - 可选的内部错误原因(用于把假 200/解析失败等信息写入 DB 与监控)
|
|
|
*/
|
|
*/
|
|
|
export async function finalizeRequestStats(
|
|
export async function finalizeRequestStats(
|
|
|
session: ProxySession,
|
|
session: ProxySession,
|
|
|
responseText: string,
|
|
responseText: string,
|
|
|
statusCode: number,
|
|
statusCode: number,
|
|
|
- duration: number
|
|
|
|
|
|
|
+ duration: number,
|
|
|
|
|
+ errorMessage?: string,
|
|
|
|
|
+ providerIdOverride?: number
|
|
|
): Promise<void> {
|
|
): Promise<void> {
|
|
|
const { messageContext, provider } = session;
|
|
const { messageContext, provider } = session;
|
|
|
if (!provider || !messageContext) {
|
|
if (!provider || !messageContext) {
|
|
|
return;
|
|
return;
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
|
|
+ const providerIdForPersistence = providerIdOverride ?? session.provider?.id;
|
|
|
|
|
+
|
|
|
// 1. 结束请求状态追踪
|
|
// 1. 结束请求状态追踪
|
|
|
ProxyStatusTracker.getInstance().endRequest(messageContext.user.id, messageContext.id);
|
|
ProxyStatusTracker.getInstance().endRequest(messageContext.user.id, messageContext.id);
|
|
|
|
|
|
|
@@ -1820,10 +2230,11 @@ export async function finalizeRequestStats(
|
|
|
// 即使没有 usageMetrics,也需要更新状态码和 provider chain
|
|
// 即使没有 usageMetrics,也需要更新状态码和 provider chain
|
|
|
await updateMessageRequestDetails(messageContext.id, {
|
|
await updateMessageRequestDetails(messageContext.id, {
|
|
|
statusCode: statusCode,
|
|
statusCode: statusCode,
|
|
|
|
|
+ ...(errorMessage ? { errorMessage } : {}),
|
|
|
ttfbMs: session.ttfbMs ?? duration,
|
|
ttfbMs: session.ttfbMs ?? duration,
|
|
|
providerChain: session.getProviderChain(),
|
|
providerChain: session.getProviderChain(),
|
|
|
model: session.getCurrentModel() ?? undefined,
|
|
model: session.getCurrentModel() ?? undefined,
|
|
|
- providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后)
|
|
|
|
|
|
|
+ providerId: providerIdForPersistence, // 更新最终供应商ID(重试切换后)
|
|
|
context1mApplied: session.getContext1mApplied(),
|
|
context1mApplied: session.getContext1mApplied(),
|
|
|
});
|
|
});
|
|
|
return;
|
|
return;
|
|
@@ -1892,6 +2303,7 @@ export async function finalizeRequestStats(
|
|
|
costUsd: costUsdStr,
|
|
costUsd: costUsdStr,
|
|
|
status: statusCode >= 200 && statusCode < 300 ? "completed" : "error",
|
|
status: statusCode >= 200 && statusCode < 300 ? "completed" : "error",
|
|
|
statusCode: statusCode,
|
|
statusCode: statusCode,
|
|
|
|
|
+ ...(errorMessage ? { errorMessage } : {}),
|
|
|
}).catch((error: unknown) => {
|
|
}).catch((error: unknown) => {
|
|
|
logger.error("[ResponseHandler] Failed to update session usage:", error);
|
|
logger.error("[ResponseHandler] Failed to update session usage:", error);
|
|
|
});
|
|
});
|
|
@@ -1909,8 +2321,9 @@ export async function finalizeRequestStats(
|
|
|
cacheCreation1hInputTokens: normalizedUsage.cache_creation_1h_input_tokens,
|
|
cacheCreation1hInputTokens: normalizedUsage.cache_creation_1h_input_tokens,
|
|
|
cacheTtlApplied: normalizedUsage.cache_ttl ?? null,
|
|
cacheTtlApplied: normalizedUsage.cache_ttl ?? null,
|
|
|
providerChain: session.getProviderChain(),
|
|
providerChain: session.getProviderChain(),
|
|
|
|
|
+ ...(errorMessage ? { errorMessage } : {}),
|
|
|
model: session.getCurrentModel() ?? undefined,
|
|
model: session.getCurrentModel() ?? undefined,
|
|
|
- providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后)
|
|
|
|
|
|
|
+ providerId: providerIdForPersistence, // 更新最终供应商ID(重试切换后)
|
|
|
context1mApplied: session.getContext1mApplied(),
|
|
context1mApplied: session.getContext1mApplied(),
|
|
|
});
|
|
});
|
|
|
}
|
|
}
|
|
@@ -2066,7 +2479,7 @@ async function persistRequestFailure(options: {
|
|
|
ttfbMs: phase === "non-stream" ? (session.ttfbMs ?? duration) : session.ttfbMs,
|
|
ttfbMs: phase === "non-stream" ? (session.ttfbMs ?? duration) : session.ttfbMs,
|
|
|
providerChain: session.getProviderChain(),
|
|
providerChain: session.getProviderChain(),
|
|
|
model: session.getCurrentModel() ?? undefined,
|
|
model: session.getCurrentModel() ?? undefined,
|
|
|
- providerId: session.provider?.id, // ⭐ 更新最终供应商ID(重试切换后)
|
|
|
|
|
|
|
+ providerId: session.provider?.id, // 更新最终供应商ID(重试切换后)
|
|
|
context1mApplied: session.getContext1mApplied(),
|
|
context1mApplied: session.getContext1mApplied(),
|
|
|
});
|
|
});
|
|
|
|
|
|