|
|
@@ -11,6 +11,7 @@ import { SessionTracker } from "@/lib/session-tracker";
|
|
|
import type { CostBreakdown } from "@/lib/utils/cost-calculation";
|
|
|
import { calculateRequestCost, calculateRequestCostBreakdown } from "@/lib/utils/cost-calculation";
|
|
|
import { hasValidPriceData } from "@/lib/utils/price-data";
|
|
|
+import { resolvePricingForModelRecords } from "@/lib/utils/pricing-resolution";
|
|
|
import { isSSEText, parseSSEData } from "@/lib/utils/sse";
|
|
|
import {
|
|
|
detectUpstreamErrorFromSseOrJsonText,
|
|
|
@@ -23,6 +24,7 @@ import {
|
|
|
} from "@/repository/message";
|
|
|
import { findLatestPriceByModel } from "@/repository/model-price";
|
|
|
import { getSystemSettings } from "@/repository/system-config";
|
|
|
+import type { Provider } from "@/types/provider";
|
|
|
import type { SessionUsageUpdate } from "@/types/session";
|
|
|
import { GeminiAdapter } from "../gemini/adapter";
|
|
|
import type { GeminiResponse } from "../gemini/types";
|
|
|
@@ -108,6 +110,123 @@ function cleanResponseHeaders(headers: Headers): Headers {
|
|
|
return cleaned;
|
|
|
}
|
|
|
|
|
|
+function ensurePricingResolutionSpecialSetting(
|
|
|
+ session: ProxySession,
|
|
|
+ resolvedPricing: Awaited<ReturnType<ProxySession["getResolvedPricingByBillingSource"]>>
|
|
|
+): void {
|
|
|
+ if (!resolvedPricing) return;
|
|
|
+
|
|
|
+ const existing = session
|
|
|
+ .getSpecialSettings()
|
|
|
+ ?.find(
|
|
|
+ (setting) =>
|
|
|
+ setting.type === "pricing_resolution" &&
|
|
|
+ setting.resolvedModelName === resolvedPricing.resolvedModelName &&
|
|
|
+ setting.resolvedPricingProviderKey === resolvedPricing.resolvedPricingProviderKey &&
|
|
|
+ setting.source === resolvedPricing.source
|
|
|
+ );
|
|
|
+
|
|
|
+ if (existing) return;
|
|
|
+
|
|
|
+ session.addSpecialSetting({
|
|
|
+ type: "pricing_resolution",
|
|
|
+ scope: "billing",
|
|
|
+ hit: true,
|
|
|
+ modelName: session.getCurrentModel() ?? resolvedPricing.resolvedModelName,
|
|
|
+ resolvedModelName: resolvedPricing.resolvedModelName,
|
|
|
+ resolvedPricingProviderKey: resolvedPricing.resolvedPricingProviderKey,
|
|
|
+ source: resolvedPricing.source,
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
+function getRequestedCodexServiceTier(session: ProxySession): string | null {
|
|
|
+ if (session.provider?.providerType !== "codex") {
|
|
|
+ return null;
|
|
|
+ }
|
|
|
+
|
|
|
+ const request = session.request.message as Record<string, unknown>;
|
|
|
+ return typeof request.service_tier === "string" ? request.service_tier : null;
|
|
|
+}
|
|
|
+
|
|
|
+export function parseServiceTierFromResponseText(responseText: string): string | null {
|
|
|
+ let lastSeenServiceTier: string | null = null;
|
|
|
+
|
|
|
+ const applyValue = (value: unknown) => {
|
|
|
+ if (typeof value === "string" && value.trim()) {
|
|
|
+ lastSeenServiceTier = value.trim();
|
|
|
+ }
|
|
|
+ };
|
|
|
+
|
|
|
+ try {
|
|
|
+ const parsedValue = JSON.parse(responseText);
|
|
|
+ if (parsedValue && typeof parsedValue === "object" && !Array.isArray(parsedValue)) {
|
|
|
+ const parsed = parsedValue as Record<string, unknown>;
|
|
|
+ applyValue(parsed.service_tier);
|
|
|
+ if (parsed.response && typeof parsed.response === "object") {
|
|
|
+ applyValue((parsed.response as Record<string, unknown>).service_tier);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ } catch {
|
|
|
+ // ignore, fallback to SSE parsing below
|
|
|
+ }
|
|
|
+
|
|
|
+ if (lastSeenServiceTier) {
|
|
|
+ return lastSeenServiceTier;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (isSSEText(responseText)) {
|
|
|
+ const events = parseSSEData(responseText);
|
|
|
+ for (const event of events) {
|
|
|
+ if (!event.data || typeof event.data !== "object") continue;
|
|
|
+ const data = event.data as Record<string, unknown>;
|
|
|
+ applyValue(data.service_tier);
|
|
|
+ if (data.response && typeof data.response === "object") {
|
|
|
+ applyValue((data.response as Record<string, unknown>).service_tier);
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ return lastSeenServiceTier;
|
|
|
+}
|
|
|
+
|
|
|
+function isPriorityServiceTierApplied(
|
|
|
+ session: ProxySession,
|
|
|
+ actualServiceTier: string | null
|
|
|
+): boolean {
|
|
|
+ if (actualServiceTier != null) {
|
|
|
+ return actualServiceTier === "priority";
|
|
|
+ }
|
|
|
+ return getRequestedCodexServiceTier(session) === "priority";
|
|
|
+}
|
|
|
+
|
|
|
+function ensureCodexServiceTierResultSpecialSetting(
|
|
|
+ session: ProxySession,
|
|
|
+ actualServiceTier: string | null
|
|
|
+): void {
|
|
|
+ if (session.provider?.providerType !== "codex") {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ const requestedServiceTier = getRequestedCodexServiceTier(session);
|
|
|
+ const effectivePriority = isPriorityServiceTierApplied(session, actualServiceTier);
|
|
|
+ const existing = session
|
|
|
+ .getSpecialSettings()
|
|
|
+ ?.find((setting) => setting.type === "codex_service_tier_result");
|
|
|
+
|
|
|
+ if (existing && existing.type === "codex_service_tier_result") {
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ session.addSpecialSetting({
|
|
|
+ type: "codex_service_tier_result",
|
|
|
+ scope: "response",
|
|
|
+ hit: effectivePriority || requestedServiceTier != null || actualServiceTier != null,
|
|
|
+ requestedServiceTier,
|
|
|
+ actualServiceTier,
|
|
|
+ effectivePriority,
|
|
|
+ });
|
|
|
+}
|
|
|
+
|
|
|
type FinalizeDeferredStreamingResult = {
|
|
|
/**
|
|
|
* “内部结算用”的状态码。
|
|
|
@@ -386,7 +505,7 @@ async function finalizeDeferredStreamingFinalizationIfNeeded(
|
|
|
const { recordEndpointSuccess } = await import("@/lib/endpoint-circuit-breaker");
|
|
|
await recordEndpointSuccess(meta.endpointId);
|
|
|
} catch (endpointError) {
|
|
|
- logger.warn("[ResponseHandler] Failed to record endpoint success (stream)", {
|
|
|
+ logger.warn("[ResponseHandler] Failed to record endpoint success (stream finalized)", {
|
|
|
endpointId: meta.endpointId,
|
|
|
providerId: meta.providerId,
|
|
|
error: endpointError,
|
|
|
@@ -441,7 +560,7 @@ async function finalizeDeferredStreamingFinalizationIfNeeded(
|
|
|
providerId: meta.providerId,
|
|
|
providerName: meta.providerName,
|
|
|
}).catch((err) => {
|
|
|
- logger.error("[ResponseHandler] Failed to update session provider info (stream)", {
|
|
|
+ logger.error("[ResponseHandler] Failed to update session provider info (stream finalized)", {
|
|
|
error: err,
|
|
|
});
|
|
|
});
|
|
|
@@ -730,6 +849,9 @@ export class ProxyResponseHandler {
|
|
|
const usageResult = parseUsageFromResponseText(responseText, provider.providerType);
|
|
|
usageRecord = usageResult.usageRecord;
|
|
|
usageMetrics = usageResult.usageMetrics;
|
|
|
+ const actualServiceTier = parseServiceTierFromResponseText(responseText);
|
|
|
+ ensureCodexServiceTierResultSpecialSetting(session, actualServiceTier);
|
|
|
+ const priorityServiceTierApplied = isPriorityServiceTierApplied(session, actualServiceTier);
|
|
|
|
|
|
if (usageMetrics) {
|
|
|
usageMetrics = normalizeUsageWithSwap(
|
|
|
@@ -775,12 +897,14 @@ export class ProxyResponseHandler {
|
|
|
session.getOriginalModel(),
|
|
|
session.getCurrentModel(),
|
|
|
usageMetrics,
|
|
|
+ provider,
|
|
|
provider.costMultiplier,
|
|
|
- session.getContext1mApplied()
|
|
|
+ session.getContext1mApplied(),
|
|
|
+ priorityServiceTierApplied
|
|
|
);
|
|
|
|
|
|
// 追踪消费到 Redis(用于限流)
|
|
|
- await trackCostToRedis(session, usageMetrics);
|
|
|
+ await trackCostToRedis(session, usageMetrics, priorityServiceTierApplied);
|
|
|
}
|
|
|
|
|
|
// Calculate cost for session tracking (with multiplier) and Langfuse (raw)
|
|
|
@@ -790,13 +914,15 @@ export class ProxyResponseHandler {
|
|
|
if (usageMetrics) {
|
|
|
try {
|
|
|
if (session.request.model) {
|
|
|
- const priceData = await session.getCachedPriceDataByBillingSource();
|
|
|
- if (priceData) {
|
|
|
+ const resolvedPricing = await session.getResolvedPricingByBillingSource(provider);
|
|
|
+ if (resolvedPricing) {
|
|
|
+ ensurePricingResolutionSpecialSetting(session, resolvedPricing);
|
|
|
const cost = calculateRequestCost(
|
|
|
usageMetrics,
|
|
|
- priceData,
|
|
|
+ resolvedPricing.priceData,
|
|
|
provider.costMultiplier,
|
|
|
- session.getContext1mApplied()
|
|
|
+ session.getContext1mApplied(),
|
|
|
+ priorityServiceTierApplied
|
|
|
);
|
|
|
if (cost.gt(0)) {
|
|
|
costUsdStr = cost.toString();
|
|
|
@@ -805,9 +931,10 @@ export class ProxyResponseHandler {
|
|
|
if (provider.costMultiplier !== 1) {
|
|
|
const rawCost = calculateRequestCost(
|
|
|
usageMetrics,
|
|
|
- priceData,
|
|
|
+ resolvedPricing.priceData,
|
|
|
1.0,
|
|
|
- session.getContext1mApplied()
|
|
|
+ session.getContext1mApplied(),
|
|
|
+ priorityServiceTierApplied
|
|
|
);
|
|
|
if (rawCost.gt(0)) {
|
|
|
rawCostUsdStr = rawCost.toString();
|
|
|
@@ -819,8 +946,9 @@ export class ProxyResponseHandler {
|
|
|
try {
|
|
|
costBreakdown = calculateRequestCostBreakdown(
|
|
|
usageMetrics,
|
|
|
- priceData,
|
|
|
- session.getContext1mApplied()
|
|
|
+ resolvedPricing.priceData,
|
|
|
+ session.getContext1mApplied(),
|
|
|
+ priorityServiceTierApplied
|
|
|
);
|
|
|
} catch {
|
|
|
/* non-critical */
|
|
|
@@ -895,7 +1023,8 @@ export class ProxyResponseHandler {
|
|
|
model: session.getCurrentModel() ?? undefined, // 更新重定向后的模型
|
|
|
providerId: session.provider?.id, // 更新最终供应商ID(重试切换后)
|
|
|
context1mApplied: session.getContext1mApplied(),
|
|
|
- swapCacheTtlApplied: provider.swapCacheTtlBilling ?? false,
|
|
|
+ swapCacheTtlApplied: session.provider?.swapCacheTtlBilling ?? false,
|
|
|
+ specialSettings: session.getSpecialSettings() ?? undefined,
|
|
|
});
|
|
|
|
|
|
// 记录请求结束
|
|
|
@@ -1179,17 +1308,23 @@ export class ProxyResponseHandler {
|
|
|
};
|
|
|
|
|
|
// 优先填充 head;超过 head 上限后切到 tail(但不代表一定发生截断,只有 tail 溢出才算截断)
|
|
|
- if (!inTailMode) {
|
|
|
- if (headBufferedBytes + bytes <= MAX_STATS_HEAD_BYTES) {
|
|
|
+ if (!inTailMode && headBufferedBytes < MAX_STATS_HEAD_BYTES) {
|
|
|
+ const remainingHeadBytes = MAX_STATS_HEAD_BYTES - headBufferedBytes;
|
|
|
+ if (remainingHeadBytes > 0 && bytes > remainingHeadBytes) {
|
|
|
+ const headPart = text.substring(0, remainingHeadBytes);
|
|
|
+ const tailPart = text.substring(remainingHeadBytes);
|
|
|
+
|
|
|
+ pushChunk(headPart, remainingHeadBytes);
|
|
|
+
|
|
|
+ inTailMode = true;
|
|
|
+ pushChunk(tailPart, bytes - remainingHeadBytes);
|
|
|
+ } else {
|
|
|
headChunks.push(text);
|
|
|
headBufferedBytes += bytes;
|
|
|
- return;
|
|
|
}
|
|
|
-
|
|
|
- inTailMode = true;
|
|
|
+ } else {
|
|
|
+ pushChunk(text, bytes);
|
|
|
}
|
|
|
-
|
|
|
- pushToTail();
|
|
|
};
|
|
|
const decoder = new TextDecoder();
|
|
|
let isFirstChunk = true;
|
|
|
@@ -1304,6 +1439,7 @@ export class ProxyResponseHandler {
|
|
|
const headText = decoder.decode(headPart, { stream: true });
|
|
|
pushChunk(headText, remainingHeadBytes);
|
|
|
|
|
|
+ inTailMode = true;
|
|
|
const tailText = decoder.decode(tailPart, { stream: true });
|
|
|
pushChunk(tailText, chunkSize - remainingHeadBytes);
|
|
|
} else {
|
|
|
@@ -1486,6 +1622,7 @@ export class ProxyResponseHandler {
|
|
|
});
|
|
|
}
|
|
|
try {
|
|
|
+ // 取消 reader lock
|
|
|
reader?.releaseLock();
|
|
|
} catch (e) {
|
|
|
logger.warn("[ResponseHandler] Gemini passthrough: Failed to release reader lock", {
|
|
|
@@ -1574,7 +1711,7 @@ export class ProxyResponseHandler {
|
|
|
const statusCode = response.status;
|
|
|
|
|
|
// 使用 AsyncTaskManager 管理后台处理任务
|
|
|
- const taskId = `stream-${messageContext.id}`;
|
|
|
+ const taskId = `stream-${messageContext?.id || `unknown-${Date.now()}`}`;
|
|
|
const abortController = new AbortController();
|
|
|
|
|
|
// ⭐ 提升 idleTimeoutId 到外部作用域,以便客户端断开时能清除
|
|
|
@@ -1688,7 +1825,7 @@ export class ProxyResponseHandler {
|
|
|
allContent,
|
|
|
session.requestSequence
|
|
|
).catch((err) => {
|
|
|
- logger.error("[ResponseHandler] Failed to store stream response:", err);
|
|
|
+ logger.error("[ResponseHandler] Failed to store response:", err);
|
|
|
});
|
|
|
}
|
|
|
|
|
|
@@ -1701,6 +1838,10 @@ export class ProxyResponseHandler {
|
|
|
const usageResult = parseUsageFromResponseText(allContent, provider.providerType);
|
|
|
usageForCost = usageResult.usageMetrics;
|
|
|
|
|
|
+ const actualServiceTier = parseServiceTierFromResponseText(allContent);
|
|
|
+ ensureCodexServiceTierResultSpecialSetting(session, actualServiceTier);
|
|
|
+ const priorityServiceTierApplied = isPriorityServiceTierApplied(session, actualServiceTier);
|
|
|
+
|
|
|
if (usageForCost) {
|
|
|
usageForCost = normalizeUsageWithSwap(
|
|
|
usageForCost,
|
|
|
@@ -1740,12 +1881,14 @@ export class ProxyResponseHandler {
|
|
|
session.getOriginalModel(),
|
|
|
session.getCurrentModel(),
|
|
|
usageForCost,
|
|
|
+ provider,
|
|
|
provider.costMultiplier,
|
|
|
- session.getContext1mApplied()
|
|
|
+ session.getContext1mApplied(),
|
|
|
+ priorityServiceTierApplied
|
|
|
);
|
|
|
|
|
|
// 追踪消费到 Redis(用于限流)
|
|
|
- await trackCostToRedis(session, usageForCost);
|
|
|
+ await trackCostToRedis(session, usageForCost, priorityServiceTierApplied);
|
|
|
|
|
|
// Calculate cost for session tracking (with multiplier) and Langfuse (raw)
|
|
|
let costUsdStr: string | undefined;
|
|
|
@@ -1754,13 +1897,15 @@ export class ProxyResponseHandler {
|
|
|
if (usageForCost) {
|
|
|
try {
|
|
|
if (session.request.model) {
|
|
|
- const priceData = await session.getCachedPriceDataByBillingSource();
|
|
|
- if (priceData) {
|
|
|
+ const resolvedPricing = await session.getResolvedPricingByBillingSource(provider);
|
|
|
+ if (resolvedPricing) {
|
|
|
+ ensurePricingResolutionSpecialSetting(session, resolvedPricing);
|
|
|
const cost = calculateRequestCost(
|
|
|
usageForCost,
|
|
|
- priceData,
|
|
|
+ resolvedPricing.priceData,
|
|
|
provider.costMultiplier,
|
|
|
- session.getContext1mApplied()
|
|
|
+ session.getContext1mApplied(),
|
|
|
+ priorityServiceTierApplied
|
|
|
);
|
|
|
if (cost.gt(0)) {
|
|
|
costUsdStr = cost.toString();
|
|
|
@@ -1769,9 +1914,10 @@ export class ProxyResponseHandler {
|
|
|
if (provider.costMultiplier !== 1) {
|
|
|
const rawCost = calculateRequestCost(
|
|
|
usageForCost,
|
|
|
- priceData,
|
|
|
+ resolvedPricing.priceData,
|
|
|
1.0,
|
|
|
- session.getContext1mApplied()
|
|
|
+ session.getContext1mApplied(),
|
|
|
+ priorityServiceTierApplied
|
|
|
);
|
|
|
if (rawCost.gt(0)) {
|
|
|
rawCostUsdStr = rawCost.toString();
|
|
|
@@ -1783,8 +1929,9 @@ export class ProxyResponseHandler {
|
|
|
try {
|
|
|
costBreakdown = calculateRequestCostBreakdown(
|
|
|
usageForCost,
|
|
|
- priceData,
|
|
|
- session.getContext1mApplied()
|
|
|
+ resolvedPricing.priceData,
|
|
|
+ session.getContext1mApplied(),
|
|
|
+ priorityServiceTierApplied
|
|
|
);
|
|
|
} catch {
|
|
|
/* non-critical */
|
|
|
@@ -1838,6 +1985,7 @@ export class ProxyResponseHandler {
|
|
|
providerId: providerIdForPersistence ?? session.provider?.id, // 更新最终供应商ID(重试切换后)
|
|
|
context1mApplied: session.getContext1mApplied(),
|
|
|
swapCacheTtlApplied: provider.swapCacheTtlBilling ?? false,
|
|
|
+ specialSettings: session.getSpecialSettings() ?? undefined,
|
|
|
});
|
|
|
|
|
|
emitLangfuseTrace(session, {
|
|
|
@@ -1862,6 +2010,7 @@ export class ProxyResponseHandler {
|
|
|
logger.info("ResponseHandler: Stream processing cancelled", {
|
|
|
taskId,
|
|
|
providerId: provider.id,
|
|
|
+ providerName: provider.name,
|
|
|
chunksCollected: chunks.length,
|
|
|
});
|
|
|
break; // 提前终止
|
|
|
@@ -2172,7 +2321,7 @@ export class ProxyResponseHandler {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
-function extractUsageMetrics(value: unknown): UsageMetrics | null {
|
|
|
+export function extractUsageMetrics(value: unknown): UsageMetrics | null {
|
|
|
if (!value || typeof value !== "object") {
|
|
|
return null;
|
|
|
}
|
|
|
@@ -2202,7 +2351,7 @@ function extractUsageMetrics(value: unknown): UsageMetrics | null {
|
|
|
}
|
|
|
|
|
|
// OpenAI chat completion format: prompt_tokens → input_tokens
|
|
|
- // Priority: Claude (input_tokens) > Gemini (promptTokenCount) > OpenAI (prompt_tokens)
|
|
|
+ // Priority: Claude (input_tokens) > Gemini (candidatesTokenCount) > OpenAI (prompt_tokens)
|
|
|
if (result.input_tokens === undefined && typeof usage.prompt_tokens === "number") {
|
|
|
result.input_tokens = usage.prompt_tokens;
|
|
|
hasAny = true;
|
|
|
@@ -2386,7 +2535,7 @@ function extractUsageMetrics(value: unknown): UsageMetrics | null {
|
|
|
if (inputTokensDetails && typeof inputTokensDetails.cached_tokens === "number") {
|
|
|
result.cache_read_input_tokens = inputTokensDetails.cached_tokens;
|
|
|
hasAny = true;
|
|
|
- logger.debug("[UsageMetrics] Extracted cached tokens from OpenAI Response API format", {
|
|
|
+ logger.debug("[ResponseHandler] Parsed cached tokens from OpenAI Response API format", {
|
|
|
cachedTokens: inputTokensDetails.cached_tokens,
|
|
|
});
|
|
|
}
|
|
|
@@ -2695,7 +2844,7 @@ function normalizeUsageWithSwap(
|
|
|
|
|
|
let resolvedCacheTtl = swapped.cache_ttl ?? session.getCacheTtlResolved?.() ?? null;
|
|
|
|
|
|
- // When the upstream response had no cache_ttl, we fell through to the session-level
|
|
|
+ // When the upstream response had no cache_ttl,we fell through to the session-level
|
|
|
// getCacheTtlResolved() fallback which reflects the *original* (un-swapped) value.
|
|
|
// We must invert it here to stay consistent with the already-swapped bucket tokens.
|
|
|
if (swapCacheTtlBilling && !usageMetrics.cache_ttl) {
|
|
|
@@ -2726,8 +2875,10 @@ async function updateRequestCostFromUsage(
|
|
|
originalModel: string | null,
|
|
|
redirectedModel: string | null,
|
|
|
usage: UsageMetrics | null,
|
|
|
+ provider: Provider | null,
|
|
|
costMultiplier: number = 1.0,
|
|
|
- context1mApplied: boolean = false
|
|
|
+ context1mApplied: boolean = false,
|
|
|
+ priorityServiceTierApplied: boolean = false
|
|
|
): Promise<void> {
|
|
|
if (!usage) {
|
|
|
logger.warn("[CostCalculation] No usage data, skipping cost update", {
|
|
|
@@ -2742,20 +2893,16 @@ async function updateRequestCostFromUsage(
|
|
|
}
|
|
|
|
|
|
try {
|
|
|
- // 获取系统设置中的计费模型来源配置
|
|
|
const systemSettings = await getSystemSettings();
|
|
|
const billingModelSource = systemSettings.billingModelSource;
|
|
|
|
|
|
- // 根据配置决定计费模型优先级
|
|
|
let primaryModel: string | null;
|
|
|
let fallbackModel: string | null;
|
|
|
|
|
|
if (billingModelSource === "original") {
|
|
|
- // 优先使用重定向前的原始模型
|
|
|
primaryModel = originalModel;
|
|
|
fallbackModel = redirectedModel;
|
|
|
} else {
|
|
|
- // 优先使用重定向后的实际模型
|
|
|
primaryModel = redirectedModel;
|
|
|
fallbackModel = originalModel;
|
|
|
}
|
|
|
@@ -2767,50 +2914,21 @@ async function updateRequestCostFromUsage(
|
|
|
fallbackModel,
|
|
|
});
|
|
|
|
|
|
- // Fallback 逻辑:优先主要模型,找不到则用备选模型
|
|
|
- let priceData = null;
|
|
|
- let usedModelForPricing = null;
|
|
|
-
|
|
|
- const resolveValidPriceData = async (modelName: string) => {
|
|
|
- const record = await findLatestPriceByModel(modelName);
|
|
|
- const data = record?.priceData;
|
|
|
- if (!data || !hasValidPriceData(data)) {
|
|
|
- return null;
|
|
|
- }
|
|
|
- return record;
|
|
|
- };
|
|
|
-
|
|
|
- // Step 1: 尝试主要模型
|
|
|
- if (primaryModel) {
|
|
|
- const resolved = await resolveValidPriceData(primaryModel);
|
|
|
- if (resolved) {
|
|
|
- priceData = resolved;
|
|
|
- usedModelForPricing = primaryModel;
|
|
|
- logger.debug("[CostCalculation] Using primary model for pricing", {
|
|
|
- messageId,
|
|
|
- model: primaryModel,
|
|
|
- billingModelSource,
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
-
|
|
|
- // Step 2: Fallback 到备选模型
|
|
|
- if (!priceData && fallbackModel && fallbackModel !== primaryModel) {
|
|
|
- const resolved = await resolveValidPriceData(fallbackModel);
|
|
|
- if (resolved) {
|
|
|
- priceData = resolved;
|
|
|
- usedModelForPricing = fallbackModel;
|
|
|
- logger.warn("[CostCalculation] Primary model price not found, using fallback model", {
|
|
|
- messageId,
|
|
|
- primaryModel,
|
|
|
- fallbackModel,
|
|
|
- billingModelSource,
|
|
|
- });
|
|
|
- }
|
|
|
- }
|
|
|
+ const primaryRecord = primaryModel ? await findLatestPriceByModel(primaryModel) : null;
|
|
|
+ const fallbackRecord =
|
|
|
+ fallbackModel && fallbackModel !== primaryModel
|
|
|
+ ? await findLatestPriceByModel(fallbackModel)
|
|
|
+ : null;
|
|
|
+
|
|
|
+ const resolvedPricing = resolvePricingForModelRecords({
|
|
|
+ provider,
|
|
|
+ primaryModelName: primaryModel,
|
|
|
+ fallbackModelName: fallbackModel,
|
|
|
+ primaryRecord,
|
|
|
+ fallbackRecord,
|
|
|
+ });
|
|
|
|
|
|
- // Step 3: 完全失败(无价格或价格表暂不可用):不计费放行,并异步触发一次同步
|
|
|
- if (!priceData?.priceData) {
|
|
|
+ if (!resolvedPricing?.priceData || !hasValidPriceData(resolvedPricing.priceData)) {
|
|
|
logger.warn("[CostCalculation] No price data found, skipping billing", {
|
|
|
messageId,
|
|
|
originalModel,
|
|
|
@@ -2822,13 +2940,19 @@ async function updateRequestCostFromUsage(
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // 计算费用
|
|
|
- const cost = calculateRequestCost(usage, priceData.priceData, costMultiplier, context1mApplied);
|
|
|
+ const cost = calculateRequestCost(
|
|
|
+ usage,
|
|
|
+ resolvedPricing.priceData,
|
|
|
+ costMultiplier,
|
|
|
+ context1mApplied,
|
|
|
+ priorityServiceTierApplied
|
|
|
+ );
|
|
|
|
|
|
logger.info("[CostCalculation] Cost calculated successfully", {
|
|
|
messageId,
|
|
|
- usedModelForPricing,
|
|
|
- billingModelSource,
|
|
|
+ usedModelForPricing: resolvedPricing.resolvedModelName,
|
|
|
+ resolvedPricingProviderKey: resolvedPricing.resolvedPricingProviderKey,
|
|
|
+ pricingResolutionSource: resolvedPricing.source,
|
|
|
costUsd: cost.toString(),
|
|
|
costMultiplier,
|
|
|
usage,
|
|
|
@@ -2839,11 +2963,12 @@ async function updateRequestCostFromUsage(
|
|
|
} else {
|
|
|
logger.warn("[CostCalculation] Calculated cost is zero or negative", {
|
|
|
messageId,
|
|
|
- usedModelForPricing,
|
|
|
+ usedModelForPricing: resolvedPricing.resolvedModelName,
|
|
|
+ resolvedPricingProviderKey: resolvedPricing.resolvedPricingProviderKey,
|
|
|
costUsd: cost.toString(),
|
|
|
priceData: {
|
|
|
- inputCost: priceData.priceData.input_cost_per_token,
|
|
|
- outputCost: priceData.priceData.output_cost_per_token,
|
|
|
+ inputCost: resolvedPricing.priceData.input_cost_per_token,
|
|
|
+ outputCost: resolvedPricing.priceData.output_cost_per_token,
|
|
|
},
|
|
|
});
|
|
|
}
|
|
|
@@ -2877,27 +3002,21 @@ export async function finalizeRequestStats(
|
|
|
}
|
|
|
|
|
|
const providerIdForPersistence = providerIdOverride ?? session.provider?.id;
|
|
|
-
|
|
|
- // 1. 结束请求状态追踪
|
|
|
- ProxyStatusTracker.getInstance().endRequest(messageContext.user.id, messageContext.id);
|
|
|
-
|
|
|
- // 2. 更新请求时长
|
|
|
- await updateMessageRequestDuration(messageContext.id, duration);
|
|
|
-
|
|
|
- // 3. 解析 usage metrics
|
|
|
const { usageMetrics } = parseUsageFromResponseText(responseText, provider.providerType);
|
|
|
-
|
|
|
+ const actualServiceTier = parseServiceTierFromResponseText(responseText);
|
|
|
+ ensureCodexServiceTierResultSpecialSetting(session, actualServiceTier);
|
|
|
+ const priorityServiceTierApplied = isPriorityServiceTierApplied(session, actualServiceTier);
|
|
|
if (!usageMetrics) {
|
|
|
- // 即使没有 usageMetrics,也需要更新状态码和 provider chain
|
|
|
await updateMessageRequestDetails(messageContext.id, {
|
|
|
statusCode: statusCode,
|
|
|
...(errorMessage ? { errorMessage } : {}),
|
|
|
ttfbMs: session.ttfbMs ?? duration,
|
|
|
providerChain: session.getProviderChain(),
|
|
|
model: session.getCurrentModel() ?? undefined,
|
|
|
- providerId: providerIdForPersistence, // 更新最终供应商ID(重试切换后)
|
|
|
+ providerId: providerIdForPersistence,
|
|
|
context1mApplied: session.getContext1mApplied(),
|
|
|
- swapCacheTtlApplied: provider.swapCacheTtlBilling ?? false,
|
|
|
+ swapCacheTtlApplied: session.provider?.swapCacheTtlBilling ?? false,
|
|
|
+ specialSettings: session.getSpecialSettings() ?? undefined,
|
|
|
});
|
|
|
return null;
|
|
|
}
|
|
|
@@ -2916,25 +3035,29 @@ export async function finalizeRequestStats(
|
|
|
session.getOriginalModel(),
|
|
|
session.getCurrentModel(),
|
|
|
normalizedUsage,
|
|
|
+ provider,
|
|
|
provider.costMultiplier,
|
|
|
- session.getContext1mApplied()
|
|
|
+ session.getContext1mApplied(),
|
|
|
+ priorityServiceTierApplied
|
|
|
);
|
|
|
|
|
|
// 5. 追踪消费到 Redis(用于限流)
|
|
|
- await trackCostToRedis(session, normalizedUsage);
|
|
|
+ await trackCostToRedis(session, normalizedUsage, priorityServiceTierApplied);
|
|
|
|
|
|
// 6. 更新 session usage
|
|
|
if (session.sessionId) {
|
|
|
let costUsdStr: string | undefined;
|
|
|
try {
|
|
|
if (session.request.model) {
|
|
|
- const priceData = await session.getCachedPriceDataByBillingSource();
|
|
|
- if (priceData) {
|
|
|
+ const resolvedPricing = await session.getResolvedPricingByBillingSource(provider);
|
|
|
+ if (resolvedPricing) {
|
|
|
+ ensurePricingResolutionSpecialSetting(session, resolvedPricing);
|
|
|
const cost = calculateRequestCost(
|
|
|
normalizedUsage,
|
|
|
- priceData,
|
|
|
+ resolvedPricing.priceData,
|
|
|
provider.costMultiplier,
|
|
|
- session.getContext1mApplied()
|
|
|
+ session.getContext1mApplied(),
|
|
|
+ priorityServiceTierApplied
|
|
|
);
|
|
|
if (cost.gt(0)) {
|
|
|
costUsdStr = cost.toString();
|
|
|
@@ -2978,6 +3101,7 @@ export async function finalizeRequestStats(
|
|
|
providerId: providerIdForPersistence, // 更新最终供应商ID(重试切换后)
|
|
|
context1mApplied: session.getContext1mApplied(),
|
|
|
swapCacheTtlApplied: provider.swapCacheTtlBilling ?? false,
|
|
|
+ specialSettings: session.getSpecialSettings() ?? undefined,
|
|
|
});
|
|
|
|
|
|
return normalizedUsage;
|
|
|
@@ -2986,7 +3110,11 @@ export async function finalizeRequestStats(
|
|
|
/**
|
|
|
* 追踪消费到 Redis(用于限流)
|
|
|
*/
|
|
|
-async function trackCostToRedis(session: ProxySession, usage: UsageMetrics | null): Promise<void> {
|
|
|
+async function trackCostToRedis(
|
|
|
+ session: ProxySession,
|
|
|
+ usage: UsageMetrics | null,
|
|
|
+ priorityServiceTierApplied: boolean = false
|
|
|
+): Promise<void> {
|
|
|
if (!usage || !session.sessionId) return;
|
|
|
|
|
|
try {
|
|
|
@@ -3000,15 +3128,17 @@ async function trackCostToRedis(session: ProxySession, usage: UsageMetrics | nul
|
|
|
const modelName = session.request.model;
|
|
|
if (!modelName) return;
|
|
|
|
|
|
- // 计算成本(应用倍率)- 使用 session 缓存避免重复查询
|
|
|
- const priceData = await session.getCachedPriceDataByBillingSource();
|
|
|
- if (!priceData) return;
|
|
|
+ const resolvedPricing = await session.getResolvedPricingByBillingSource(provider);
|
|
|
+ if (!resolvedPricing) return;
|
|
|
+
|
|
|
+ ensurePricingResolutionSpecialSetting(session, resolvedPricing);
|
|
|
|
|
|
const cost = calculateRequestCost(
|
|
|
usage,
|
|
|
- priceData,
|
|
|
+ resolvedPricing.priceData,
|
|
|
provider.costMultiplier,
|
|
|
- session.getContext1mApplied()
|
|
|
+ session.getContext1mApplied(),
|
|
|
+ priorityServiceTierApplied
|
|
|
);
|
|
|
if (cost.lte(0)) return;
|
|
|
|
|
|
@@ -3137,6 +3267,7 @@ async function persistRequestFailure(options: {
|
|
|
providerId: session.provider?.id, // 更新最终供应商ID(重试切换后)
|
|
|
context1mApplied: session.getContext1mApplied(),
|
|
|
swapCacheTtlApplied: session.provider?.swapCacheTtlBilling ?? false,
|
|
|
+ specialSettings: session.getSpecialSettings() ?? undefined,
|
|
|
});
|
|
|
|
|
|
const isAsyncWrite = getEnvConfig().MESSAGE_REQUEST_WRITE_MODE !== "sync";
|