|
|
@@ -268,11 +268,7 @@ func batchPatchImage2Base64(ctx context.Context, imageTasks []*relaymodel.Claude
|
|
|
continue
|
|
|
}
|
|
|
|
|
|
- wg.Add(1)
|
|
|
-
|
|
|
- go func() {
|
|
|
- defer wg.Done()
|
|
|
-
|
|
|
+ wg.Go(func() {
|
|
|
_ = sem.Acquire(ctx, 1)
|
|
|
defer sem.Release(1)
|
|
|
|
|
|
@@ -291,7 +287,7 @@ func batchPatchImage2Base64(ctx context.Context, imageTasks []*relaymodel.Claude
|
|
|
task.Source.URL = ""
|
|
|
task.Source.MediaType = mimeType
|
|
|
task.Source.Data = data
|
|
|
- }()
|
|
|
+ })
|
|
|
}
|
|
|
|
|
|
wg.Wait()
|
|
|
@@ -303,8 +299,42 @@ func batchPatchImage2Base64(ctx context.Context, imageTasks []*relaymodel.Claude
|
|
|
return nil
|
|
|
}
|
|
|
|
|
|
-// https://docs.anthropic.com/claude/reference/messages-streaming
|
|
|
-func StreamResponse2OpenAI(
|
|
|
+// StreamState maintains state during streaming response conversion
|
|
|
+type StreamState struct {
|
|
|
+ // claudeIndexToToolCallIndex maps Claude's content block index to OpenAI tool call index
|
|
|
+ // Claude's index includes all content blocks (text, thinking, tool_use), but OpenAI only counts tool calls
|
|
|
+ claudeIndexToToolCallIndex map[int]int
|
|
|
+ // nextToolCallIndex tracks the next tool call index to assign (0-based)
|
|
|
+ nextToolCallIndex int
|
|
|
+}
|
|
|
+
|
|
|
+func NewStreamState() *StreamState {
|
|
|
+ return &StreamState{
|
|
|
+ claudeIndexToToolCallIndex: make(map[int]int),
|
|
|
+ nextToolCallIndex: 0,
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+// getToolCallIndex returns the OpenAI tool call index for a given Claude content block index
|
|
|
+// If this is the first time seeing this Claude index for a tool call, assigns a new tool call index
|
|
|
+func (s *StreamState) getToolCallIndex(claudeIndex int, isNewToolCall bool) int {
|
|
|
+ if idx, exists := s.claudeIndexToToolCallIndex[claudeIndex]; exists {
|
|
|
+ return idx
|
|
|
+ }
|
|
|
+
|
|
|
+ if isNewToolCall {
|
|
|
+ toolCallIndex := s.nextToolCallIndex
|
|
|
+ s.claudeIndexToToolCallIndex[claudeIndex] = toolCallIndex
|
|
|
+ s.nextToolCallIndex++
|
|
|
+ return toolCallIndex
|
|
|
+ }
|
|
|
+
|
|
|
+ // This shouldn't happen in normal flow, but return a safe default
|
|
|
+ return 0
|
|
|
+}
|
|
|
+
|
|
|
+// StreamResponse2OpenAI converts Claude streaming response to OpenAI format
|
|
|
+func (s *StreamState) StreamResponse2OpenAI(
|
|
|
meta *meta.Meta,
|
|
|
respData []byte,
|
|
|
) (*relaymodel.ChatCompletionsStreamResponse, adaptor.Error) {
|
|
|
@@ -340,8 +370,9 @@ func StreamResponse2OpenAI(
|
|
|
if claudeResponse.ContentBlock != nil {
|
|
|
content = claudeResponse.ContentBlock.Text
|
|
|
if claudeResponse.ContentBlock.Type == toolUseType {
|
|
|
+ toolCallIndex := s.getToolCallIndex(claudeResponse.Index, true)
|
|
|
tools = append(tools, relaymodel.ToolCall{
|
|
|
- Index: claudeResponse.Index,
|
|
|
+ Index: toolCallIndex,
|
|
|
ID: claudeResponse.ContentBlock.ID,
|
|
|
Type: "function",
|
|
|
Function: relaymodel.Function{
|
|
|
@@ -354,8 +385,9 @@ func StreamResponse2OpenAI(
|
|
|
if claudeResponse.Delta != nil {
|
|
|
switch claudeResponse.Delta.Type {
|
|
|
case "input_json_delta":
|
|
|
+ toolCallIndex := s.getToolCallIndex(claudeResponse.Index, false)
|
|
|
tools = append(tools, relaymodel.ToolCall{
|
|
|
- Index: claudeResponse.Index,
|
|
|
+ Index: toolCallIndex,
|
|
|
Type: "function",
|
|
|
Function: relaymodel.Function{
|
|
|
Arguments: claudeResponse.Delta.PartialJSON,
|
|
|
@@ -393,6 +425,7 @@ func StreamResponse2OpenAI(
|
|
|
ToolCalls: tools,
|
|
|
Role: "assistant",
|
|
|
},
|
|
|
+ Index: 0,
|
|
|
FinishReason: stopReasonClaude2OpenAI(stopReason),
|
|
|
}
|
|
|
|
|
|
@@ -445,8 +478,9 @@ func Response2OpenAI(
|
|
|
case toolUseType:
|
|
|
args, _ := sonic.MarshalString(v.Input)
|
|
|
tools = append(tools, relaymodel.ToolCall{
|
|
|
- ID: v.ID,
|
|
|
- Type: "function",
|
|
|
+ Index: len(tools),
|
|
|
+ ID: v.ID,
|
|
|
+ Type: "function",
|
|
|
Function: relaymodel.Function{
|
|
|
Name: v.Name,
|
|
|
Arguments: args,
|
|
|
@@ -514,6 +548,8 @@ func OpenAIStreamHandler(
|
|
|
writed bool
|
|
|
)
|
|
|
|
|
|
+ streamState := NewStreamState()
|
|
|
+
|
|
|
for scanner.Scan() {
|
|
|
data := scanner.Bytes()
|
|
|
if !render.IsValidSSEData(data) {
|
|
|
@@ -525,7 +561,7 @@ func OpenAIStreamHandler(
|
|
|
break
|
|
|
}
|
|
|
|
|
|
- response, err := StreamResponse2OpenAI(m, data)
|
|
|
+ response, err := streamState.StreamResponse2OpenAI(m, data)
|
|
|
if err != nil {
|
|
|
if writed {
|
|
|
log.Errorf("response error: %+v", err)
|