Просмотр исходного кода

Prevent terminated-stream retry loops and preserve reliable Kimi handling

Neonsy 1 неделя назад
Родитель
Сommit
f7cf4fd500

+ 5 - 0
.changeset/fix-chutes-kimi-no-assistant-messages.md

@@ -0,0 +1,5 @@
+---
+"kilo-code": patch
+---
+
+Improve Chutes Kimi reliability by preventing terminated-stream retry loops and handling tool/reasoning chunks more safely.

+ 184 - 5
src/api/providers/__tests__/chutes.spec.ts

@@ -153,6 +153,39 @@ describe("ChutesHandler", () => {
 		])
 	})
 
+	// kilocode_change start
+	it("should handle non-DeepSeek reasoning field", async () => {
+		mockCreate.mockImplementationOnce(async () => ({
+			[Symbol.asyncIterator]: async function* () {
+				yield {
+					choices: [
+						{
+							delta: { reasoning: "Thinking through it..." },
+							index: 0,
+						},
+					],
+					usage: null,
+				}
+			},
+		}))
+
+		const systemPrompt = "You are a helpful assistant."
+		const messages: Anthropic.Messages.MessageParam[] = [{ role: "user", content: "Hi" }]
+		mockFetchModel.mockResolvedValueOnce({
+			id: "some-other-model",
+			info: { maxTokens: 1024, temperature: 0.7 },
+		})
+
+		const stream = handler.createMessage(systemPrompt, messages)
+		const chunks = []
+		for await (const chunk of stream) {
+			chunks.push(chunk)
+		}
+
+		expect(chunks).toEqual([{ type: "reasoning", text: "Thinking through it..." }])
+	})
+	// kilocode_change end
+
 	it("should return default model when no model is specified", async () => {
 		const model = await handler.fetchModel()
 		expect(model.id).toBe(chutesDefaultModelId)
@@ -275,6 +308,131 @@ describe("ChutesHandler", () => {
 		})
 	})
 
+	// kilocode_change start
+	it("createMessage should yield tool_call_end on finish_reason tool_calls", async () => {
+		mockCreate.mockImplementationOnce(() => {
+			return {
+				[Symbol.asyncIterator]: () => ({
+					next: vi
+						.fn()
+						.mockResolvedValueOnce({
+							done: false,
+							value: {
+								choices: [
+									{
+										delta: {
+											tool_calls: [
+												{
+													index: 0,
+													id: "call_finish",
+													function: { name: "test_tool", arguments: '{"arg":"value"}' },
+												},
+											],
+										},
+										finish_reason: null,
+									},
+								],
+							},
+						})
+						.mockResolvedValueOnce({
+							done: false,
+							value: {
+								choices: [
+									{
+										delta: {},
+										finish_reason: "tool_calls",
+									},
+								],
+							},
+						})
+						.mockResolvedValueOnce({ done: true }),
+				}),
+			}
+		})
+
+		const stream = handler.createMessage("system prompt", [])
+		const chunks = []
+		for await (const chunk of stream) {
+			chunks.push(chunk)
+		}
+
+		expect(chunks).toEqual([
+			{
+				type: "tool_call_partial",
+				index: 0,
+				id: "call_finish",
+				name: "test_tool",
+				arguments: '{"arg":"value"}',
+			},
+			{
+				type: "tool_call_end",
+				id: "call_finish",
+			},
+		])
+	})
+
+	it("createMessage should synthesize tool call ids when provider omits them", async () => {
+		mockCreate.mockImplementationOnce(() => {
+			return {
+				[Symbol.asyncIterator]: () => ({
+					next: vi
+						.fn()
+						.mockResolvedValueOnce({
+							done: false,
+							value: {
+								choices: [
+									{
+										delta: {
+											tool_calls: [
+												{
+													index: 0,
+													function: { name: "test_tool", arguments: '{"arg":"value"}' },
+												},
+											],
+										},
+										finish_reason: null,
+									},
+								],
+							},
+						})
+						.mockResolvedValueOnce({
+							done: false,
+							value: {
+								choices: [
+									{
+										delta: {},
+										finish_reason: "tool_calls",
+									},
+								],
+							},
+						})
+						.mockResolvedValueOnce({ done: true }),
+				}),
+			}
+		})
+
+		const stream = handler.createMessage("system prompt", [])
+		const chunks = []
+		for await (const chunk of stream) {
+			chunks.push(chunk)
+		}
+
+		expect(chunks).toEqual([
+			{
+				type: "tool_call_partial",
+				index: 0,
+				id: "chutes_tool_call_0",
+				name: "test_tool",
+				arguments: '{"arg":"value"}',
+			},
+			{
+				type: "tool_call_end",
+				id: "chutes_tool_call_0",
+			},
+		])
+	})
+	// kilocode_change end
+
 	it("createMessage should pass tools and tool_choice to API", async () => {
 		const tools = [
 			{
@@ -307,6 +465,9 @@ describe("ChutesHandler", () => {
 				tools,
 				tool_choice,
 			}),
+			expect.objectContaining({
+				timeout: expect.any(Number),
+			}),
 		)
 	})
 
@@ -326,11 +487,29 @@ describe("ChutesHandler", () => {
 			apiModelId: testModelId,
 			chutesApiKey: "test-chutes-api-key",
 		})
-		// Note: getModel() returns fallback default without calling fetchModel
-		// Since we haven't called fetchModel, it returns the default chutesDefaultModelId
-		// which is DeepSeek-R1-0528, therefore temperature will be DEEP_SEEK_DEFAULT_TEMPERATURE
+		;(handlerWithModel as any).models = {
+			[testModelId]: {
+				...chutesDefaultModelInfo,
+				temperature: 0.7,
+			},
+		}
 		const model = handlerWithModel.getModel()
-		// The default model is DeepSeek-R1, so it returns DEEP_SEEK_DEFAULT_TEMPERATURE
-		expect(model.info.temperature).toBe(DEEP_SEEK_DEFAULT_TEMPERATURE)
+		expect(model.id).toBe(testModelId)
+		expect(model.info.temperature).toBe(0.5)
+	})
+
+	// kilocode_change start
+	it("should preserve explicit Chutes model id when it is unavailable in cached model list", () => {
+		const unsupportedModelId = "moonshotai/Kimi-K2.5-TEE"
+		const handlerWithModel = new ChutesHandler({
+			apiModelId: unsupportedModelId,
+			chutesApiKey: "test-chutes-api-key",
+		})
+
+		const model = handlerWithModel.getModel()
+
+		expect(model.id).toBe(unsupportedModelId)
+		expect(model.info.temperature).toBe(0.5)
 	})
+	// kilocode_change end
 })

+ 106 - 9
src/api/providers/chutes.ts

@@ -11,6 +11,7 @@ import { ApiStream } from "../transform/stream"
 import type { SingleCompletionHandler, ApiHandlerCreateMessageMetadata } from "../index"
 
 import { RouterProvider } from "./router-provider"
+import { getApiRequestTimeout } from "./utils/timeout-config"
 
 export class ChutesHandler extends RouterProvider implements SingleCompletionHandler {
 	constructor(options: ApiHandlerOptions) {
@@ -25,6 +26,14 @@ export class ChutesHandler extends RouterProvider implements SingleCompletionHan
 		})
 	}
 
+	// kilocode_change start
+	private getRequestOptions() {
+		return {
+			timeout: getApiRequestTimeout(),
+		}
+	}
+	// kilocode_change end
+
 	private getCompletionParams(
 		systemPrompt: string,
 		messages: Anthropic.Messages.MessageParam[],
@@ -59,6 +68,32 @@ export class ChutesHandler extends RouterProvider implements SingleCompletionHan
 		return params
 	}
 
+	// kilocode_change start
+	private getToolCallId(
+		toolCall: {
+			id?: string
+			index?: number
+		},
+		toolCallIdsByIndex: Map<number, string>,
+	): string {
+		const toolCallIndex = toolCall.index ?? 0
+
+		if (toolCall.id) {
+			toolCallIdsByIndex.set(toolCallIndex, toolCall.id)
+			return toolCall.id
+		}
+
+		const existingId = toolCallIdsByIndex.get(toolCallIndex)
+		if (existingId) {
+			return existingId
+		}
+
+		const syntheticId = `chutes_tool_call_${toolCallIndex}`
+		toolCallIdsByIndex.set(toolCallIndex, syntheticId)
+		return syntheticId
+	}
+	// kilocode_change end
+
 	override async *createMessage(
 		systemPrompt: string,
 		messages: Anthropic.Messages.MessageParam[],
@@ -70,7 +105,7 @@ export class ChutesHandler extends RouterProvider implements SingleCompletionHan
 			const stream = await this.client.chat.completions.create({
 				...this.getCompletionParams(systemPrompt, messages, metadata),
 				messages: convertToR1Format([{ role: "user", content: systemPrompt }, ...messages]),
-			})
+			}, this.getRequestOptions())
 
 			const matcher = new XmlMatcher(
 				"think",
@@ -80,9 +115,16 @@ export class ChutesHandler extends RouterProvider implements SingleCompletionHan
 						text: chunk.data,
 					}) as const,
 			)
+			// kilocode_change start
+			const activeToolCallIds = new Set<string>()
+			const toolCallIdsByIndex = new Map<number, string>()
+			// kilocode_change end
 
 			for await (const chunk of stream) {
 				const delta = chunk.choices[0]?.delta
+				// kilocode_change start
+				const finishReason = chunk.choices[0]?.finish_reason
+				// kilocode_change end
 
 				if (delta?.content) {
 					for (const processedChunk of matcher.update(delta.content)) {
@@ -93,15 +135,27 @@ export class ChutesHandler extends RouterProvider implements SingleCompletionHan
 				// Emit raw tool call chunks - NativeToolCallParser handles state management
 				if (delta && "tool_calls" in delta && Array.isArray(delta.tool_calls)) {
 					for (const toolCall of delta.tool_calls) {
+						// kilocode_change start
+						const toolCallId = this.getToolCallId(toolCall, toolCallIdsByIndex)
+						activeToolCallIds.add(toolCallId)
+						// kilocode_change end
 						yield {
 							type: "tool_call_partial",
 							index: toolCall.index,
-							id: toolCall.id,
+							id: toolCallId,
 							name: toolCall.function?.name,
 							arguments: toolCall.function?.arguments,
 						}
 					}
 				}
+				// kilocode_change start
+				if (finishReason === "tool_calls" && activeToolCallIds.size > 0) {
+					for (const id of activeToolCallIds) {
+						yield { type: "tool_call_end", id }
+					}
+					activeToolCallIds.clear()
+				}
+				// kilocode_change end
 
 				if (chunk.usage) {
 					yield {
@@ -120,31 +174,61 @@ export class ChutesHandler extends RouterProvider implements SingleCompletionHan
 			// For non-DeepSeek-R1 models, use standard OpenAI streaming
 			const stream = await this.client.chat.completions.create(
 				this.getCompletionParams(systemPrompt, messages, metadata),
+				this.getRequestOptions(),
 			)
+			// kilocode_change start
+			const activeToolCallIds = new Set<string>()
+			const toolCallIdsByIndex = new Map<number, string>()
+			// kilocode_change end
 
 			for await (const chunk of stream) {
 				const delta = chunk.choices[0]?.delta
+				// kilocode_change start
+				const finishReason = chunk.choices[0]?.finish_reason
+				// kilocode_change end
 
 				if (delta?.content) {
 					yield { type: "text", text: delta.content }
 				}
 
-				if (delta && "reasoning_content" in delta && delta.reasoning_content) {
-					yield { type: "reasoning", text: (delta.reasoning_content as string | undefined) || "" }
+				// kilocode_change start
+				if (delta) {
+					for (const key of ["reasoning_content", "reasoning"] as const) {
+						if (key in delta) {
+							const reasoningContent = ((delta as any)[key] as string | undefined) || ""
+							if (reasoningContent.trim()) {
+								yield { type: "reasoning", text: reasoningContent }
+							}
+							break
+						}
+					}
 				}
+				// kilocode_change end
 
 				// Emit raw tool call chunks - NativeToolCallParser handles state management
 				if (delta && "tool_calls" in delta && Array.isArray(delta.tool_calls)) {
 					for (const toolCall of delta.tool_calls) {
+						// kilocode_change start
+						const toolCallId = this.getToolCallId(toolCall, toolCallIdsByIndex)
+						activeToolCallIds.add(toolCallId)
+						// kilocode_change end
 						yield {
 							type: "tool_call_partial",
 							index: toolCall.index,
-							id: toolCall.id,
+							id: toolCallId,
 							name: toolCall.function?.name,
 							arguments: toolCall.function?.arguments,
 						}
 					}
 				}
+				// kilocode_change start
+				if (finishReason === "tool_calls" && activeToolCallIds.size > 0) {
+					for (const id of activeToolCallIds) {
+						yield { type: "tool_call_end", id }
+					}
+					activeToolCallIds.clear()
+				}
+				// kilocode_change end
 
 				if (chunk.usage) {
 					yield {
@@ -184,7 +268,7 @@ export class ChutesHandler extends RouterProvider implements SingleCompletionHan
 				requestParams.temperature = this.options.modelTemperature ?? defaultTemperature
 			}
 
-			const response = await this.client.chat.completions.create(requestParams)
+			const response = await this.client.chat.completions.create(requestParams, this.getRequestOptions())
 			return response.choices[0]?.message.content || ""
 		} catch (error) {
 			if (error instanceof Error) {
@@ -196,12 +280,25 @@ export class ChutesHandler extends RouterProvider implements SingleCompletionHan
 
 	override getModel() {
 		const model = super.getModel()
-		const isDeepSeekR1 = model.id.includes("DeepSeek-R1")
+		const configuredModelId = this.options.apiModelId
+		// kilocode_change start
+		// Keep explicit Chutes model IDs instead of silently switching to the provider default.
+		// This prevents hidden model substitution when model lists are stale/unavailable.
+		const shouldPreserveExplicitModelId =
+			!!configuredModelId &&
+			configuredModelId !== this.defaultModelId &&
+			model.id === this.defaultModelId &&
+			!this.models[configuredModelId]
+
+		const effectiveModelId = shouldPreserveExplicitModelId ? configuredModelId : model.id
+		const baseInfo = shouldPreserveExplicitModelId ? this.defaultModelInfo : model.info
+		// kilocode_change end
+		const isDeepSeekR1 = effectiveModelId.includes("DeepSeek-R1")
 
 		return {
-			...model,
+			id: effectiveModelId,
 			info: {
-				...model.info,
+				...baseInfo,
 				temperature: isDeepSeekR1 ? DEEP_SEEK_DEFAULT_TEMPERATURE : 0.5,
 			},
 		}

+ 38 - 0
src/core/task/Task.ts

@@ -158,6 +158,9 @@ const MAX_EXPONENTIAL_BACKOFF_SECONDS = 600 // 10 minutes
 const DEFAULT_USAGE_COLLECTION_TIMEOUT_MS = 5000 // 5 seconds
 const FORCED_CONTEXT_REDUCTION_PERCENT = 75 // Keep 75% of context (remove 25%) on context window errors
 const MAX_CONTEXT_WINDOW_RETRIES = 3 // Maximum retries for context window errors
+// kilocode_change start
+const MAX_CHUTES_TERMINATED_RETRY_ATTEMPTS = 2 // Allow up to 2 retries (3 total attempts) before failing fast
+// kilocode_change end
 
 export interface TaskOptions extends CreateTaskOptions {
 	context: vscode.ExtensionContext // kilocode_change
@@ -3553,6 +3556,17 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
 
 						// Clean up partial state
 						await abortStream(cancelReason, streamingFailedMessage)
+						// kilocode_change start
+						// Bound retries for repeated Chutes "terminated" stream failures
+						// to prevent indefinite thinking/retry loops.
+						const retryAttempt = currentItem.retryAttempt ?? 0
+						if (this.hasExceededChutesTerminatedRetryLimit(error, retryAttempt)) {
+							console.error(
+								`[Task#${this.taskId}.${this.instanceId}] Chutes stream terminated repeatedly. Stopping retries after attempt ${retryAttempt}.`,
+							)
+							throw error
+						}
+						// kilocode_change end
 
 						if (this.abort) {
 							// User cancelled - abort the entire task
@@ -4279,6 +4293,22 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
 		}
 	}
 
+	// kilocode_change start
+	private isChutesTerminatedError(error: unknown): boolean {
+		if (this.apiConfiguration?.apiProvider !== "chutes") {
+			return false
+		}
+
+		const message =
+			error instanceof Error ? error.message : typeof error === "string" ? error : JSON.stringify(error)
+		return /\bterminated\b/i.test(message || "")
+	}
+
+	private hasExceededChutesTerminatedRetryLimit(error: unknown, retryAttempt: number): boolean {
+		return this.isChutesTerminatedError(error) && retryAttempt >= MAX_CHUTES_TERMINATED_RETRY_ATTEMPTS
+	}
+	// kilocode_change end
+
 	public async *attemptApiRequest(
 		retryAttempt: number = 0,
 		options: { skipProviderRateLimit?: boolean } = {},
@@ -4654,6 +4684,14 @@ export class Task extends EventEmitter<TaskEvents> implements TaskLike {
 				return
 			}
 			// kilocode_change end
+			// kilocode_change start
+			// Chutes can occasionally terminate streams abruptly; avoid recursive
+			// first-chunk auto-retries here and delegate retry policy to the
+			// outer request loop, which applies a bounded retry cap.
+			if (this.isChutesTerminatedError(error)) {
+				throw error
+			}
+			// kilocode_change end
 			// note that this api_req_failed ask is unique in that we only present this option if the api hasn't streamed any content yet (ie it fails on the first chunk due), as it would allow them to hit a retry button. However if the api failed mid-stream, it could be in any arbitrary state where some tools may have executed, so that error is handled differently and requires cancelling the task entirely.
 			if (autoApprovalEnabled) {
 				// Apply shared exponential backoff and countdown UX

+ 76 - 0
src/core/task/__tests__/Task.spec.ts

@@ -900,6 +900,82 @@ describe("Cline", () => {
 				await task.catch(() => {})
 			})
 
+			// kilocode_change start
+			it("attemptApiRequest should not recursively auto-retry first-chunk Chutes terminated errors", async () => {
+				const chutesConfig = {
+					...mockApiConfig,
+					apiProvider: "chutes" as const,
+					apiModelId: "moonshotai/Kimi-K2.5-TEE",
+				}
+
+				const task = new Task({
+					provider: mockProvider,
+					apiConfiguration: chutesConfig,
+					task: "test task",
+					startTask: false,
+					context: mockExtensionContext,
+				})
+
+				const terminatedError = new Error("terminated")
+				const mockFailedStream = {
+					// eslint-disable-next-line require-yield
+					async *[Symbol.asyncIterator]() {
+						throw terminatedError
+					},
+					async next() {
+						throw terminatedError
+					},
+					async return() {
+						return { done: true, value: undefined }
+					},
+					async throw(e: any) {
+						throw e
+					},
+					async [Symbol.asyncDispose]() {
+						// Cleanup
+					},
+				} as AsyncGenerator<ApiStreamChunk>
+
+				const createMessageSpy = vi.spyOn(task.api, "createMessage").mockReturnValue(mockFailedStream)
+				const backoffSpy = vi.spyOn(task as any, "backoffAndAnnounce").mockResolvedValue(undefined)
+
+				mockProvider.getState = vi.fn().mockResolvedValue({
+					apiConfiguration: chutesConfig,
+					autoApprovalEnabled: true,
+					requestDelaySeconds: 1,
+					mode: "code",
+				})
+
+				const iterator = task.attemptApiRequest(0, { skipProviderRateLimit: true })
+				await expect(iterator.next()).rejects.toThrow("terminated")
+
+				expect(createMessageSpy).toHaveBeenCalledTimes(1)
+				expect(backoffSpy).not.toHaveBeenCalled()
+			})
+
+			it("should apply Chutes terminated retry cap at the configured threshold", async () => {
+				const chutesConfig = {
+					...mockApiConfig,
+					apiProvider: "chutes" as const,
+					apiModelId: "moonshotai/Kimi-K2.5-TEE",
+				}
+
+				const task = new Task({
+					provider: mockProvider,
+					apiConfiguration: chutesConfig,
+					task: "test task",
+					startTask: false,
+					context: mockExtensionContext,
+				})
+
+				const terminatedError = new Error("terminated")
+
+				expect((task as any).hasExceededChutesTerminatedRetryLimit(terminatedError, 0)).toBe(false)
+				expect((task as any).hasExceededChutesTerminatedRetryLimit(terminatedError, 1)).toBe(false)
+				expect((task as any).hasExceededChutesTerminatedRetryLimit(terminatedError, 2)).toBe(true)
+			})
+			// kilocode_change end
+
 			describe("processUserContentMentions", () => {
 				it("should process mentions in task and feedback tags", async () => {
 					const [cline, task] = Task.create({