Răsfoiți Sursa

Merge pull request #26 from jerry-271828/main

修复了模型在部分场景下循环的问题
jerry-271828 1 lună în urmă
părinte
comite
ef8e032a9e
4 a modificat fișierele cu 456 adăugiri și 58 ștergeri
  1. 45 18
      app.py
  2. 145 34
      claude_converter.py
  3. 58 6
      claude_stream.py
  4. 208 0
      message_processor.py

+ 45 - 18
app.py

@@ -573,13 +573,40 @@ async def claude_messages(
     Claude-compatible messages endpoint.
     """
     # 1. Convert request
-    requested_conversation_id = req.conversation_id or x_conversation_id
+    # Always generate a new conversation_id like amq2api does
+    # Using the same conversation_id can cause Amazon Q to return cached/stale data
     try:
-        aq_request = convert_claude_to_amazonq_request(req, conversation_id=requested_conversation_id)
+        aq_request = convert_claude_to_amazonq_request(req, conversation_id=None)
     except Exception as e:
         traceback.print_exc()
         raise HTTPException(status_code=400, detail=f"Request conversion failed: {str(e)}")
 
+    # Post-process history to fix message ordering (prevents infinite loops)
+    from message_processor import process_claude_history_for_amazonq
+    conversation_state = aq_request.get("conversationState", {})
+    history = conversation_state.get("history", [])
+    if history:
+        processed_history = process_claude_history_for_amazonq(history)
+        aq_request["conversationState"]["history"] = processed_history
+
+    # Remove duplicate tail userInputMessage that matches currentMessage content
+    # This prevents the model from repeatedly responding to the same user message
+    conversation_state = aq_request.get("conversationState", {})
+    current_msg = conversation_state.get("currentMessage", {}).get("userInputMessage", {})
+    current_content = (current_msg.get("content") or "").strip()
+    history = conversation_state.get("history", [])
+
+    if history and current_content:
+        last = history[-1]
+        if "userInputMessage" in last:
+            last_content = (last["userInputMessage"].get("content") or "").strip()
+            if last_content and last_content == current_content:
+                # Remove duplicate tail userInputMessage
+                history = history[:-1]
+                aq_request["conversationState"]["history"] = history
+                import logging
+                logging.getLogger(__name__).info("Removed duplicate tail userInputMessage to prevent repeated response")
+
     conversation_state = aq_request.get("conversationState", {})
     conversation_id = conversation_state.get("conversationId")
     response_headers: Dict[str, str] = {}
@@ -593,7 +620,7 @@ async def claude_messages(
         if not access:
             refreshed = await refresh_access_token_in_db(account["id"])
             access = refreshed.get("accessToken")
-        
+
         # We call with stream=True to get the event iterator
         _, _, tracker, event_iter = await send_chat_request(
             access_token=access,
@@ -603,7 +630,7 @@ async def claude_messages(
             client=GLOBAL_CLIENT,
             raw_payload=aq_request
         )
-        
+
         if not event_iter:
              raise HTTPException(status_code=502, detail="No event stream returned")
 
@@ -617,7 +644,7 @@ async def claude_messages(
                 for item in req.system:
                     if isinstance(item, dict) and item.get("type") == "text":
                         text_to_count += item.get("text", "")
-        
+
         for msg in req.messages:
             if isinstance(msg.content, str):
                 text_to_count += msg.content
@@ -627,7 +654,7 @@ async def claude_messages(
                         text_to_count += item.get("text", "")
 
         input_tokens = count_tokens(text_to_count, apply_multiplier=True)
-        handler = ClaudeStreamHandler(model=req.model, input_tokens=input_tokens)
+        handler = ClaudeStreamHandler(model=req.model, input_tokens=input_tokens, conversation_id=conversation_id)
 
         # Try to get the first event to ensure the connection is valid
         # This allows us to return proper HTTP error codes before starting the stream
@@ -647,7 +674,7 @@ async def claude_messages(
                     event_type, payload = first_event
                     async for sse in handler.handle_event(event_type, payload):
                         yield sse
-                
+
                 # Process remaining events
                 async for event_type, payload in event_iter:
                     async for sse in handler.handle_event(event_type, payload):
@@ -673,19 +700,19 @@ async def claude_messages(
             # This is a bit complex because we need to reconstruct the full response object
             # For now, let's just support streaming as it's the main use case for Claude Code
             # But to be nice, let's try to support non-streaming by consuming the generator
-            
+
             content_blocks = []
             usage = {"input_tokens": 0, "output_tokens": 0}
             stop_reason = None
-            
+
             # We need to parse the SSE strings back to objects... inefficient but works
             # Or we could refactor handler to yield objects.
             # For now, let's just raise error for non-streaming or implement basic text
             # Claude Code uses streaming.
-            
+
             # Let's implement a basic accumulator from the SSE stream
             final_content = []
-            
+
             async for sse_chunk in event_generator():
                 data_str = None
                 # Each chunk from the generator can have multiple lines ('event:', 'data:').
@@ -694,20 +721,20 @@ async def claude_messages(
                     if line.startswith("data:"):
                         data_str = line[6:].strip()
                         break
-                
+
                 if not data_str or data_str == "[DONE]":
                     continue
-                
+
                 try:
                     data = json.loads(data_str)
                     dtype = data.get("type")
-                    
+
                     if dtype == "content_block_start":
                         idx = data.get("index", 0)
                         while len(final_content) <= idx:
                             final_content.append(None)
                         final_content[idx] = data.get("content_block")
-                    
+
                     elif dtype == "content_block_delta":
                         idx = data.get("index", 0)
                         delta = data.get("delta", {})
@@ -721,7 +748,7 @@ async def claude_messages(
                                 if "partial_json" not in final_content[idx]:
                                     final_content[idx]["partial_json"] = ""
                                 final_content[idx]["partial_json"] += delta.get("partial_json", "")
-                    
+
                     elif dtype == "content_block_stop":
                         idx = data.get("index", 0)
                         if final_content[idx] and final_content[idx].get("type") == "tool_use":
@@ -732,11 +759,11 @@ async def claude_messages(
                                     # Keep partial if invalid
                                     final_content[idx]["input"] = {"error": "invalid json", "partial": final_content[idx]["partial_json"]}
                                 del final_content[idx]["partial_json"]
-                    
+
                     elif dtype == "message_delta":
                         usage = data.get("usage", usage)
                         stop_reason = data.get("delta", {}).get("stop_reason")
-                
+
                 except json.JSONDecodeError:
                     # Ignore lines that are not valid JSON
                     pass

+ 145 - 34
claude_converter.py

@@ -193,79 +193,125 @@ def convert_tool(tool: ClaudeTool) -> Dict[str, Any]:
         }
     }
 
+def _merge_tool_result_into_dict(tool_results_by_id: Dict[str, Dict[str, Any]], tool_result: Dict[str, Any]) -> None:
+    """
+    Merge a tool_result into the deduplicated dict.
+    If toolUseId already exists, merge the content arrays.
+
+    Args:
+        tool_results_by_id: Dict mapping toolUseId to tool_result
+        tool_result: The tool_result to merge
+    """
+    tool_use_id = tool_result.get("toolUseId")
+    if not tool_use_id:
+        return
+
+    if tool_use_id in tool_results_by_id:
+        # Merge content arrays
+        existing = tool_results_by_id[tool_use_id]
+        existing_content = existing.get("content", [])
+        new_content = tool_result.get("content", [])
+
+        # Deduplicate content by text value
+        existing_texts = {item.get("text", "") for item in existing_content if isinstance(item, dict)}
+        for item in new_content:
+            if isinstance(item, dict):
+                text = item.get("text", "")
+                if text and text not in existing_texts:
+                    existing_content.append(item)
+                    existing_texts.add(text)
+
+        existing["content"] = existing_content
+
+        # If any result has error status, keep error
+        if tool_result.get("status") == "error":
+            existing["status"] = "error"
+
+        logger.debug(f"Merged duplicate toolUseId {tool_use_id}")
+    else:
+        # New toolUseId, add to dict
+        tool_results_by_id[tool_use_id] = tool_result.copy()
+
+
 def merge_user_messages(messages: List[Dict[str, Any]], hint: str = THINKING_HINT) -> Dict[str, Any]:
     """Merge consecutive user messages, keeping only the last 2 messages' images.
-    
+
     IMPORTANT: This function properly merges toolResults from all messages to prevent
     losing tool execution history, which would cause infinite loops.
-    
-    When merging messages that contain thinking hints, removes duplicate hints and 
+
+    Key fix: Deduplicate toolResults by toolUseId to prevent duplicate tool_result
+    entries that cause the model to repeatedly respond to the same user message.
+
+    When merging messages that contain thinking hints, removes duplicate hints and
     ensures only one hint appears at the end of the merged content.
-    
+
     Args:
         messages: List of user messages to merge
         hint: The thinking hint string to deduplicate
     """
     if not messages:
         return {}
-    
+
     all_contents = []
     base_context = None
     base_origin = None
     base_model = None
     all_images = []
-    all_tool_results = []  # Collect toolResults from all messages
-    
+    # Use dict to deduplicate toolResults by toolUseId
+    tool_results_by_id: Dict[str, Dict[str, Any]] = {}
+
     for msg in messages:
         content = msg.get("content", "")
         msg_ctx = msg.get("userInputMessageContext", {})
-        
+
         # Initialize base context from first message
         if base_context is None:
             base_context = msg_ctx.copy() if msg_ctx else {}
             # Remove toolResults from base to merge them separately
             if "toolResults" in base_context:
-                all_tool_results.extend(base_context.pop("toolResults"))
+                for tr in base_context.pop("toolResults"):
+                    _merge_tool_result_into_dict(tool_results_by_id, tr)
         else:
             # Collect toolResults from subsequent messages
             if "toolResults" in msg_ctx:
-                all_tool_results.extend(msg_ctx["toolResults"])
-        
+                for tr in msg_ctx["toolResults"]:
+                    _merge_tool_result_into_dict(tool_results_by_id, tr)
+
         if base_origin is None:
             base_origin = msg.get("origin", "KIRO_CLI")
         if base_model is None:
             base_model = msg.get("modelId")
-        
+
         # Remove thinking hint from individual message content to avoid duplication
         # The hint will be added once at the end of the merged content
         if content:
             content_cleaned = content.replace(hint, "").strip()
             if content_cleaned:
                 all_contents.append(content_cleaned)
-        
+
         # Collect images from each message
         msg_images = msg.get("images")
         if msg_images:
             all_images.append(msg_images)
-    
+
     # Merge content and ensure thinking hint appears only once at the end
     merged_content = "\n\n".join(all_contents)
     # Check if any of the original messages had the hint (indicating thinking was enabled)
     had_thinking_hint = any(hint in msg.get("content", "") for msg in messages)
     if had_thinking_hint:
         merged_content = _append_thinking_hint(merged_content, hint)
-    
+
     result = {
         "content": merged_content,
         "userInputMessageContext": base_context or {},
         "origin": base_origin or "KIRO_CLI",
         "modelId": base_model
     }
-    
-    # Add merged toolResults if any
-    if all_tool_results:
-        result["userInputMessageContext"]["toolResults"] = all_tool_results
-    
+
+    # Add deduplicated toolResults if any
+    if tool_results_by_id:
+        result["userInputMessageContext"]["toolResults"] = list(tool_results_by_id.values())
+
     # Only keep images from the last 2 messages that have images
     if all_images:
         kept_images = []
@@ -273,21 +319,56 @@ def merge_user_messages(messages: List[Dict[str, Any]], hint: str = THINKING_HIN
             kept_images.extend(img_list)
         if kept_images:
             result["images"] = kept_images
-    
+
     return result
 
+def _reorder_tool_results_by_tool_uses(tool_results: List[Dict[str, Any]], tool_use_order: List[str]) -> List[Dict[str, Any]]:
+    """Reorder tool_results to match the order of tool_uses from the preceding assistant message.
+
+    This is critical for preventing model confusion when parallel tool calls return results
+    in a different order than they were called.
+
+    Args:
+        tool_results: List of tool_result dicts with toolUseId
+        tool_use_order: List of toolUseIds in the order they appeared in the assistant message
+
+    Returns:
+        Reordered list of tool_results
+    """
+    if not tool_use_order or not tool_results:
+        return tool_results
+
+    result_by_id = {r["toolUseId"]: r for r in tool_results}
+    ordered_results = []
+
+    # Add results in the order of tool_uses
+    for tool_use_id in tool_use_order:
+        if tool_use_id in result_by_id:
+            ordered_results.append(result_by_id.pop(tool_use_id))
+
+    # Add any remaining results not in the original order (shouldn't happen normally)
+    ordered_results.extend(result_by_id.values())
+
+    return ordered_results
+
+
 def process_history(messages: List[ClaudeMessage], thinking_enabled: bool = False, hint: str = THINKING_HINT) -> List[Dict[str, Any]]:
     """Process history messages to match Amazon Q format (alternating user/assistant).
-    
+
     Dual-mode detection:
     - If messages already alternate correctly (no consecutive user/assistant), skip merging
     - If messages have consecutive same-role messages, apply merge logic
+
+    Key fix: Track tool_use order from assistant messages and reorder tool_results in user
+    messages to match. This prevents model confusion when parallel tool calls return results
+    in a different order than they were called.
     """
     history = []
-    seen_tool_use_ids = set()
-    
+    seen_tool_use_ids = set()  # Track tool_use IDs in assistant messages
+    last_tool_use_order = []  # Track order of tool_uses from the last assistant message
+
     raw_history = []
-    
+
     # First pass: convert individual messages
     for msg in messages:
         if msg.role == "user":
@@ -296,7 +377,7 @@ def process_history(messages: List[ClaudeMessage], thinking_enabled: bool = Fals
             tool_results = None
             images = extract_images_from_content(content)
             should_append_hint = thinking_enabled
-            
+
             if isinstance(content, list):
                 text_parts = []
                 for block in content:
@@ -307,10 +388,12 @@ def process_history(messages: List[ClaudeMessage], thinking_enabled: bool = Fals
                         elif btype == "thinking":
                             text_parts.append(_wrap_thinking_content(block.get("thinking", "")))
                         elif btype == "tool_result":
+                            tool_use_id = block.get("tool_use_id")
+
                             if tool_results is None:
                                 tool_results = []
                             result = _process_tool_result_block(block)
-                            # Merge if exists
+                            # Merge if exists within this message
                             existing = next((r for r in tool_results if r["toolUseId"] == result["toolUseId"]), None)
                             if existing:
                                 existing["content"].extend(result["content"])
@@ -321,10 +404,15 @@ def process_history(messages: List[ClaudeMessage], thinking_enabled: bool = Fals
                 text_content = "\n".join(text_parts)
             else:
                 text_content = extract_text_from_content(content)
-            
+
             if should_append_hint:
                 text_content = _append_thinking_hint(text_content, hint)
-            
+
+            # Reorder tool_results to match the order of tool_uses from the preceding assistant message
+            if tool_results and last_tool_use_order:
+                tool_results = _reorder_tool_results_by_tool_uses(tool_results, last_tool_use_order)
+                logger.info(f"Reordered {len(tool_results)} tool_results to match tool_uses order")
+
             user_ctx = {
                 "envState": {
                     "operatingSystem": "macos",
@@ -341,20 +429,22 @@ def process_history(messages: List[ClaudeMessage], thinking_enabled: bool = Fals
             }
             if images:
                 u_msg["images"] = images
-                
+
             raw_history.append({"userInputMessage": u_msg})
-            
+
         elif msg.role == "assistant":
             content = msg.content
             text_content = extract_text_from_content(content)
-            
+
             entry = {
                 "assistantResponseMessage": {
                     "messageId": str(uuid.uuid4()),
                     "content": text_content
                 }
             }
-            
+
+            # Track tool_use order for reordering tool_results in the next user message
+            last_tool_use_order = []
             if isinstance(content, list):
                 tool_uses = []
                 for block in content:
@@ -362,6 +452,7 @@ def process_history(messages: List[ClaudeMessage], thinking_enabled: bool = Fals
                         tid = block.get("id")
                         if tid and tid not in seen_tool_use_ids:
                             seen_tool_use_ids.add(tid)
+                            last_tool_use_order.append(tid)  # Track order
                             tool_uses.append({
                                 "toolUseId": tid,
                                 "name": block.get("name"),
@@ -369,7 +460,7 @@ def process_history(messages: List[ClaudeMessage], thinking_enabled: bool = Fals
                             })
                 if tool_uses:
                     entry["assistantResponseMessage"]["toolUses"] = tool_uses
-            
+
             raw_history.append(entry)
 
     # Dual-mode detection: check if messages already alternate correctly
@@ -541,6 +632,26 @@ def convert_claude_to_amazonq_request(req: ClaudeRequest, conversation_id: Optio
         else:
             prompt_content = extract_text_from_content(content)
 
+    # Get tool_use order from the last assistant message for reordering current message's tool_results
+    last_tool_use_order = []
+    if len(req.messages) >= 2:
+        # Find the last assistant message before the current user message
+        for i in range(len(req.messages) - 2, -1, -1):
+            if req.messages[i].role == "assistant":
+                assistant_content = req.messages[i].content
+                if isinstance(assistant_content, list):
+                    for block in assistant_content:
+                        if isinstance(block, dict) and block.get("type") == "tool_use":
+                            tid = block.get("id")
+                            if tid:
+                                last_tool_use_order.append(tid)
+                break
+
+    # Reorder tool_results to match the order of tool_uses from the preceding assistant message
+    if tool_results and last_tool_use_order:
+        tool_results = _reorder_tool_results_by_tool_uses(tool_results, last_tool_use_order)
+        logger.info(f"Reordered {len(tool_results)} current message tool_results to match tool_uses order")
+
     # 3. Context
     user_ctx = {
         "envState": {

+ 58 - 6
claude_stream.py

@@ -1,6 +1,7 @@
 import json
 import logging
 import importlib.util
+import uuid
 from pathlib import Path
 from typing import AsyncGenerator, Optional, Dict, Any, List, Set
 import tiktoken
@@ -71,7 +72,7 @@ except Exception as e:
     def build_tool_use_input_delta(*args, **kwargs): return ""
 
 class ClaudeStreamHandler:
-    def __init__(self, model: str, input_tokens: int = 0):
+    def __init__(self, model: str, input_tokens: int = 0, conversation_id: Optional[str] = None):
         self.model = model
         self.input_tokens = input_tokens
         self.response_buffer: List[str] = []
@@ -80,7 +81,7 @@ class ClaudeStreamHandler:
         self.content_block_start_sent: bool = False
         self.content_block_stop_sent: bool = False
         self.message_start_sent: bool = False
-        self.conversation_id: Optional[str] = None
+        self.conversation_id: Optional[str] = conversation_id
 
         # Tool use state
         self.current_tool_use: Optional[Dict[str, Any]] = None
@@ -95,13 +96,21 @@ class ClaudeStreamHandler:
         self.think_buffer: str = ""
         self.pending_start_tag_chars: int = 0
 
+        # Response termination flag
+        self.response_ended: bool = False
+
     async def handle_event(self, event_type: str, payload: Dict[str, Any]) -> AsyncGenerator[str, None]:
         """Process a single Amazon Q event and yield Claude SSE events."""
-        
+
+        # Early return if response has already ended
+        if self.response_ended:
+            return
+
         # 1. Message Start (initial-response)
         if event_type == "initial-response":
             if not self.message_start_sent:
-                conv_id = payload.get('conversationId', self.conversation_id or 'unknown')
+                # Use conversation_id from payload if available, otherwise use the one passed to constructor
+                conv_id = payload.get('conversationId') or self.conversation_id or str(uuid.uuid4())
                 self.conversation_id = conv_id
                 yield build_message_start(conv_id, self.model, self.input_tokens)
                 self.message_start_sent = True
@@ -231,6 +240,12 @@ class ClaudeStreamHandler:
             tool_input = payload.get("input", {})
             is_stop = payload.get("stop", False)
 
+            # Deduplication: skip if this tool_use_id was already processed and no tool is active
+            # (allows input deltas to pass through when current_tool_use is set)
+            if tool_use_id and tool_use_id in self._processed_tool_use_ids and not self.current_tool_use:
+                logger.warning(f"Detected duplicate tool use event, toolUseId={tool_use_id}, skipping")
+                return
+
             # Start new tool use
             if tool_use_id and tool_name and not self.current_tool_use:
                 # Close previous text block if open
@@ -266,10 +281,12 @@ class ClaudeStreamHandler:
             if is_stop and self.current_tool_use:
                 full_input = "".join(self.tool_input_buffer)
                 self.all_tool_inputs.append(full_input)
-                
+
                 yield build_content_block_stop(self.content_block_index)
-                self.content_block_stop_sent = True
+                # Reset state to allow next content block
+                self.content_block_stop_sent = False  # Reset to False to allow next block
                 self.content_block_started = False
+                self.content_block_start_sent = False  # Important: reset start flag for next block
                 self.current_tool_use = None
                 self.tool_use_id = None
                 self.tool_name = None
@@ -282,8 +299,43 @@ class ClaudeStreamHandler:
                 yield build_content_block_stop(self.content_block_index)
                 self.content_block_stop_sent = True
 
+            # Mark as finished to prevent processing further events
+            self.response_ended = True
+
+            # Immediately send message_stop (instead of waiting for finish())
+            full_text = "".join(self.response_buffer)
+            full_tool_input = "".join(self.all_tool_inputs)
+            output_tokens = count_tokens(full_text) + count_tokens(full_tool_input)
+            yield build_message_stop(self.input_tokens, output_tokens, "end_turn")
+
     async def finish(self) -> AsyncGenerator[str, None]:
         """Send final events."""
+        # Skip if response already ended (message_stop already sent)
+        if self.response_ended:
+            return
+
+        # Flush any remaining think_buffer content
+        if self.think_buffer:
+            if self.in_think_block:
+                # Emit remaining thinking content
+                yield build_content_block_delta(
+                    self.content_block_index,
+                    self.think_buffer,
+                    delta_type="thinking_delta",
+                    field_name="thinking"
+                )
+            else:
+                # Emit remaining text content
+                if not self.content_block_start_sent:
+                    self.content_block_index += 1
+                    yield build_content_block_start(self.content_block_index, "text")
+                    self.content_block_start_sent = True
+                    self.content_block_started = True
+                    self.content_block_stop_sent = False
+                self.response_buffer.append(self.think_buffer)
+                yield build_content_block_delta(self.content_block_index, self.think_buffer)
+            self.think_buffer = ""
+
         # Ensure last block is closed
         if self.content_block_started and not self.content_block_stop_sent:
             yield build_content_block_stop(self.content_block_index)

+ 208 - 0
message_processor.py

@@ -0,0 +1,208 @@
+"""
+消息处理模块
+处理 Claude Code 历史记录,合并连续的用户消息,确保符合 Amazon Q 格式要求
+"""
+import logging
+from typing import List, Dict, Any
+
+logger = logging.getLogger(__name__)
+
+
+def merge_user_messages(user_messages: List[Dict[str, Any]]) -> Dict[str, Any]:
+    """
+    合并多个 userInputMessage 的内容
+
+    Args:
+        user_messages: userInputMessage 列表
+
+    Returns:
+        合并后的 userInputMessage
+    """
+    if not user_messages:
+        return {}
+
+    # 提取所有内容
+    all_contents = []
+    base_context = None
+    base_origin = None
+    base_model = None
+    all_images = []  # Collect images from messages
+
+    for msg in user_messages:
+        content = msg.get("content", "")
+        msg_ctx = msg.get("userInputMessageContext", {})
+
+        # 保留第一个消息的上下文信息
+        if base_context is None:
+            base_context = msg_ctx.copy() if msg_ctx else {}
+
+        # 保留第一个消息的 origin
+        if base_origin is None:
+            base_origin = msg.get("origin", "KIRO_CLI")
+
+        # 保留第一个消息的 modelId
+        if base_model is None and "modelId" in msg:
+            base_model = msg["modelId"]
+
+        # 添加内容(保留所有内容,包括 system-reminder)
+        if content:
+            all_contents.append(content)
+
+        # Collect images from each message
+        msg_images = msg.get("images")
+        if msg_images:
+            all_images.append(msg_images)
+
+    # 合并内容,使用双换行分隔
+    merged_content = "\n\n".join(all_contents)
+
+    # 构建合并后的消息
+    merged_msg = {
+        "content": merged_content,
+        "userInputMessageContext": base_context or {},
+        "origin": base_origin or "KIRO_CLI"
+    }
+
+    # 如果原始消息有 modelId,也保留
+    if base_model:
+        merged_msg["modelId"] = base_model
+
+    # Only keep images from the last 2 messages that have images
+    if all_images:
+        kept_images = []
+        for img_list in all_images[-2:]:  # Take last 2 messages' images
+            kept_images.extend(img_list)
+        if kept_images:
+            merged_msg["images"] = kept_images
+
+    return merged_msg
+
+
+def process_claude_history_for_amazonq(history: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
+    """
+    处理 Claude Code 历史记录,使其符合 Amazon Q 要求
+
+    策略:
+    1. 合并连续的 userInputMessage
+    2. 保留所有内容(包括 system-reminder)
+    3. 确保 user-assistant 消息严格交替
+
+    Args:
+        history: Claude Code 的历史记录
+
+    Returns:
+        处理后的历史记录,符合 Amazon Q 格式
+    """
+    if not history:
+        return []
+
+    processed_history = []
+    pending_user_messages = []
+
+    # logger.info(f"[MESSAGE_PROCESSOR] 开始处理历史记录,共 {len(history)} 条消息")
+
+    for idx, msg in enumerate(history):
+        if "userInputMessage" in msg:
+            # 收集连续的用户消息
+            pending_user_messages.append(msg["userInputMessage"])
+            logger.debug(f"[MESSAGE_PROCESSOR] 消息 {idx}: 收集 userInputMessage,当前待合并数量: {len(pending_user_messages)}")
+
+        elif "assistantResponseMessage" in msg:
+            # 遇到助手消息时,先合并之前的用户消息
+            if pending_user_messages:
+                logger.info(f"[MESSAGE_PROCESSOR] 消息 {idx}: 合并 {len(pending_user_messages)} 条 userInputMessage")
+                merged_user_msg = merge_user_messages(pending_user_messages)
+                processed_history.append({
+                    "userInputMessage": merged_user_msg
+                })
+                pending_user_messages = []
+
+            # 添加助手消息
+            logger.debug(f"[MESSAGE_PROCESSOR] 消息 {idx}: 添加 assistantResponseMessage")
+            processed_history.append(msg)
+
+    # 处理末尾剩余的用户消息
+    if pending_user_messages:
+        logger.info(f"[MESSAGE_PROCESSOR] 处理末尾剩余的 {len(pending_user_messages)} 条 userInputMessage")
+        merged_user_msg = merge_user_messages(pending_user_messages)
+        processed_history.append({
+            "userInputMessage": merged_user_msg
+        })
+
+    logger.info(f"[MESSAGE_PROCESSOR] 历史记录处理完成,原始 {len(history)} 条 -> 处理后 {len(processed_history)} 条")
+
+    # 验证消息交替
+    try:
+        validate_message_alternation(processed_history)
+    except ValueError as e:
+        logger.error(f"[MESSAGE_PROCESSOR] 消息交替验证失败: {e}")
+        raise
+
+    return processed_history
+
+
+def validate_message_alternation(history: List[Dict[str, Any]]) -> bool:
+    """
+    验证消息是否严格交替(user-assistant-user-assistant...)
+
+    Args:
+        history: 历史记录
+
+    Returns:
+        是否有效
+
+    Raises:
+        ValueError: 如果消息不交替
+    """
+    if not history:
+        return True
+
+    last_role = None
+
+    for idx, msg in enumerate(history):
+        if "userInputMessage" in msg:
+            current_role = "user"
+        elif "assistantResponseMessage" in msg:
+            current_role = "assistant"
+        else:
+            logger.warning(f"[MESSAGE_PROCESSOR] 消息 {idx} 既不是 user 也不是 assistant: {list(msg.keys())}")
+            continue
+
+        if last_role == current_role:
+            error_msg = f"消息 {idx} 违反交替规则: 连续两个 {current_role} 消息"
+            logger.error(f"[MESSAGE_PROCESSOR] {error_msg}")
+            logger.error(f"[MESSAGE_PROCESSOR] 上一条消息: {list(history[idx-1].keys())}")
+            logger.error(f"[MESSAGE_PROCESSOR] 当前消息: {list(msg.keys())}")
+            raise ValueError(error_msg)
+
+        last_role = current_role
+
+    logger.info("[MESSAGE_PROCESSOR] 消息交替验证通过")
+    return True
+
+
+def log_history_summary(history: List[Dict[str, Any]], prefix: str = ""):
+    """
+    记录历史记录摘要,用于调试
+
+    Args:
+        history: 历史记录
+        prefix: 日志前缀
+    """
+    if not history:
+        logger.info(f"{prefix}历史记录为空")
+        return
+
+    summary = []
+    for idx, msg in enumerate(history):
+        if "userInputMessage" in msg:
+            content = msg["userInputMessage"].get("content", "")
+            # 取前80个字符作为预览
+            content_preview = content[:80].replace("\n", " ") if content else ""
+            summary.append(f"  [{idx}] USER: {content_preview}...")
+        elif "assistantResponseMessage" in msg:
+            content = msg["assistantResponseMessage"].get("content", "")
+            content_preview = content[:80].replace("\n", " ") if content else ""
+            summary.append(f"  [{idx}] ASSISTANT: {content_preview}...")
+
+    logger.info(f"{prefix}历史记录摘要 (共 {len(history)} 条):\n" + "\n".join(summary))