message_processor.py 6.7 KB


  1. """
  2. 消息处理模块
  3. 处理 Claude Code 历史记录,合并连续的用户消息,确保符合 Amazon Q 格式要求
  4. """
  5. import logging
  6. from typing import List, Dict, Any
  7. logger = logging.getLogger(__name__)
  8. def merge_user_messages(user_messages: List[Dict[str, Any]]) -> Dict[str, Any]:
  9. """
  10. 合并多个 userInputMessage 的内容
  11. Args:
  12. user_messages: userInputMessage 列表
  13. Returns:
  14. 合并后的 userInputMessage
  15. """
  16. if not user_messages:
  17. return {}
  18. # 提取所有内容
  19. all_contents = []
  20. base_context = None
  21. base_origin = None
  22. base_model = None
  23. all_images = [] # Collect images from messages
  24. for msg in user_messages:
  25. content = msg.get("content", "")
  26. msg_ctx = msg.get("userInputMessageContext", {})
  27. # 保留第一个消息的上下文信息
  28. if base_context is None:
  29. base_context = msg_ctx.copy() if msg_ctx else {}
  30. # 保留第一个消息的 origin
  31. if base_origin is None:
  32. base_origin = msg.get("origin", "KIRO_CLI")
  33. # 保留第一个消息的 modelId
  34. if base_model is None and "modelId" in msg:
  35. base_model = msg["modelId"]
  36. # 添加内容(保留所有内容,包括 system-reminder)
  37. if content:
  38. all_contents.append(content)
  39. # Collect images from each message
  40. msg_images = msg.get("images")
  41. if msg_images:
  42. all_images.append(msg_images)
  43. # 合并内容,使用双换行分隔
  44. merged_content = "\n\n".join(all_contents)
  45. # 构建合并后的消息
  46. merged_msg = {
  47. "content": merged_content,
  48. "userInputMessageContext": base_context or {},
  49. "origin": base_origin or "KIRO_CLI"
  50. }
  51. # 如果原始消息有 modelId,也保留
  52. if base_model:
  53. merged_msg["modelId"] = base_model
  54. # Only keep images from the last 2 messages that have images
  55. if all_images:
  56. kept_images = []
  57. for img_list in all_images[-2:]: # Take last 2 messages' images
  58. kept_images.extend(img_list)
  59. if kept_images:
  60. merged_msg["images"] = kept_images
  61. return merged_msg
  62. def process_claude_history_for_amazonq(history: List[Dict[str, Any]]) -> List[Dict[str, Any]]:
  63. """
  64. 处理 Claude Code 历史记录,使其符合 Amazon Q 要求
  65. 策略:
  66. 1. 合并连续的 userInputMessage
  67. 2. 保留所有内容(包括 system-reminder)
  68. 3. 确保 user-assistant 消息严格交替
  69. Args:
  70. history: Claude Code 的历史记录
  71. Returns:
  72. 处理后的历史记录,符合 Amazon Q 格式
  73. """
  74. if not history:
  75. return []
  76. processed_history = []
  77. pending_user_messages = []
  78. # logger.info(f"[MESSAGE_PROCESSOR] 开始处理历史记录,共 {len(history)} 条消息")
  79. for idx, msg in enumerate(history):
  80. if "userInputMessage" in msg:
  81. # 收集连续的用户消息
  82. pending_user_messages.append(msg["userInputMessage"])
  83. logger.debug(f"[MESSAGE_PROCESSOR] 消息 {idx}: 收集 userInputMessage,当前待合并数量: {len(pending_user_messages)}")
  84. elif "assistantResponseMessage" in msg:
  85. # 遇到助手消息时,先合并之前的用户消息
  86. if pending_user_messages:
  87. logger.info(f"[MESSAGE_PROCESSOR] 消息 {idx}: 合并 {len(pending_user_messages)} 条 userInputMessage")
  88. merged_user_msg = merge_user_messages(pending_user_messages)
  89. processed_history.append({
  90. "userInputMessage": merged_user_msg
  91. })
  92. pending_user_messages = []
  93. # 添加助手消息
  94. logger.debug(f"[MESSAGE_PROCESSOR] 消息 {idx}: 添加 assistantResponseMessage")
  95. processed_history.append(msg)
  96. # 处理末尾剩余的用户消息
  97. if pending_user_messages:
  98. logger.info(f"[MESSAGE_PROCESSOR] 处理末尾剩余的 {len(pending_user_messages)} 条 userInputMessage")
  99. merged_user_msg = merge_user_messages(pending_user_messages)
  100. processed_history.append({
  101. "userInputMessage": merged_user_msg
  102. })
  103. logger.info(f"[MESSAGE_PROCESSOR] 历史记录处理完成,原始 {len(history)} 条 -> 处理后 {len(processed_history)} 条")
  104. # 验证消息交替
  105. try:
  106. validate_message_alternation(processed_history)
  107. except ValueError as e:
  108. logger.error(f"[MESSAGE_PROCESSOR] 消息交替验证失败: {e}")
  109. raise
  110. return processed_history
  111. def validate_message_alternation(history: List[Dict[str, Any]]) -> bool:
  112. """
  113. 验证消息是否严格交替(user-assistant-user-assistant...)
  114. Args:
  115. history: 历史记录
  116. Returns:
  117. 是否有效
  118. Raises:
  119. ValueError: 如果消息不交替
  120. """
  121. if not history:
  122. return True
  123. last_role = None
  124. for idx, msg in enumerate(history):
  125. if "userInputMessage" in msg:
  126. current_role = "user"
  127. elif "assistantResponseMessage" in msg:
  128. current_role = "assistant"
  129. else:
  130. logger.warning(f"[MESSAGE_PROCESSOR] 消息 {idx} 既不是 user 也不是 assistant: {list(msg.keys())}")
  131. continue
  132. if last_role == current_role:
  133. error_msg = f"消息 {idx} 违反交替规则: 连续两个 {current_role} 消息"
  134. logger.error(f"[MESSAGE_PROCESSOR] {error_msg}")
  135. logger.error(f"[MESSAGE_PROCESSOR] 上一条消息: {list(history[idx-1].keys())}")
  136. logger.error(f"[MESSAGE_PROCESSOR] 当前消息: {list(msg.keys())}")
  137. raise ValueError(error_msg)
  138. last_role = current_role
  139. logger.info("[MESSAGE_PROCESSOR] 消息交替验证通过")
  140. return True
  141. def log_history_summary(history: List[Dict[str, Any]], prefix: str = ""):
  142. """
  143. 记录历史记录摘要,用于调试
  144. Args:
  145. history: 历史记录
  146. prefix: 日志前缀
  147. """
  148. if not history:
  149. logger.info(f"{prefix}历史记录为空")
  150. return
  151. summary = []
  152. for idx, msg in enumerate(history):
  153. if "userInputMessage" in msg:
  154. content = msg["userInputMessage"].get("content", "")
  155. # 取前80个字符作为预览
  156. content_preview = content[:80].replace("\n", " ") if content else ""
  157. summary.append(f" [{idx}] USER: {content_preview}...")
  158. elif "assistantResponseMessage" in msg:
  159. content = msg["assistantResponseMessage"].get("content", "")
  160. content_preview = content[:80].replace("\n", " ") if content else ""
  161. summary.append(f" [{idx}] ASSISTANT: {content_preview}...")
  162. logger.info(f"{prefix}历史记录摘要 (共 {len(history)} 条):\n" + "\n".join(summary))