replicate.py 8.3 KB


  1. import json
  2. import uuid
  3. from pathlib import Path
  4. from typing import Dict, Optional, Tuple, Iterator, List, Generator, Any
  5. import struct
  6. import requests
  7. BASE_DIR = Path(__file__).resolve().parent
  8. TEMPLATE_PATH = BASE_DIR / "templates" / "streaming_request.json"
  9. def load_template() -> Tuple[str, Dict[str, str], Dict[str, Any]]:
  10. data = json.loads(TEMPLATE_PATH.read_text(encoding="utf-8"))
  11. url, headers, body = data
  12. assert isinstance(url, str) and isinstance(headers, dict) and isinstance(body, dict)
  13. return url, headers, body
  14. def _merge_headers(as_log: Dict[str, str], bearer_token: str) -> Dict[str, str]:
  15. headers = dict(as_log)
  16. for k in list(headers.keys()):
  17. kl = k.lower()
  18. if kl in ("content-length","host","connection","transfer-encoding"):
  19. headers.pop(k, None)
  20. def set_header(name: str, value: str):
  21. for key in list(headers.keys()):
  22. if key.lower() == name.lower():
  23. del headers[key]
  24. headers[name] = value
  25. set_header("Authorization", f"Bearer {bearer_token}")
  26. set_header("amz-sdk-invocation-id", str(uuid.uuid4()))
  27. return headers
  28. def _parse_event_headers(raw: bytes) -> Dict[str, object]:
  29. headers: Dict[str, object] = {}
  30. i = 0
  31. n = len(raw)
  32. while i < n:
  33. if i + 1 > n:
  34. break
  35. name_len = raw[i]
  36. i += 1
  37. if i + name_len + 1 > n:
  38. break
  39. name = raw[i : i + name_len].decode("utf-8", errors="ignore")
  40. i += name_len
  41. htype = raw[i]
  42. i += 1
  43. if htype == 0:
  44. val = True
  45. elif htype == 1:
  46. val = False
  47. elif htype == 2:
  48. if i + 1 > n: break
  49. val = raw[i]; i += 1
  50. elif htype == 3:
  51. if i + 2 > n: break
  52. val = int.from_bytes(raw[i:i+2],"big",signed=True); i += 2
  53. elif htype == 4:
  54. if i + 4 > n: break
  55. val = int.from_bytes(raw[i:i+4],"big",signed=True); i += 4
  56. elif htype == 5:
  57. if i + 8 > n: break
  58. val = int.from_bytes(raw[i:i+8],"big",signed=True); i += 8
  59. elif htype == 6:
  60. if i + 2 > n: break
  61. l = int.from_bytes(raw[i:i+2],"big"); i += 2
  62. if i + l > n: break
  63. val = raw[i:i+l]; i += l
  64. elif htype == 7:
  65. if i + 2 > n: break
  66. l = int.from_bytes(raw[i:i+2],"big"); i += 2
  67. if i + l > n: break
  68. val = raw[i:i+l].decode("utf-8", errors="ignore"); i += l
  69. elif htype == 8:
  70. if i + 8 > n: break
  71. val = int.from_bytes(raw[i:i+8],"big",signed=False); i += 8
  72. elif htype == 9:
  73. if i + 16 > n: break
  74. import uuid as _uuid
  75. val = str(_uuid.UUID(bytes=bytes(raw[i:i+16]))); i += 16
  76. else:
  77. break
  78. headers[name] = val
  79. return headers
  80. class AwsEventStreamParser:
  81. def __init__(self):
  82. self._buf = bytearray()
  83. def feed(self, data: bytes) -> List[Tuple[Dict[str, object], bytes]]:
  84. if not data:
  85. return []
  86. self._buf.extend(data)
  87. out: List[Tuple[Dict[str, object], bytes]] = []
  88. while True:
  89. if len(self._buf) < 12:
  90. break
  91. total_len, headers_len, _prelude_crc = struct.unpack(">I I I", self._buf[:12])
  92. if total_len < 16 or headers_len > total_len:
  93. self._buf.pop(0)
  94. continue
  95. if len(self._buf) < total_len:
  96. break
  97. msg = bytes(self._buf[:total_len])
  98. del self._buf[:total_len]
  99. headers_raw = msg[12:12+headers_len]
  100. payload = msg[12+headers_len: total_len-4]
  101. headers = _parse_event_headers(headers_raw)
  102. out.append((headers, payload))
  103. return out
  104. def _try_decode_event_payload(payload: bytes) -> Optional[dict]:
  105. try:
  106. return json.loads(payload.decode("utf-8"))
  107. except Exception:
  108. return None
  109. def _extract_text_from_event(ev: dict) -> Optional[str]:
  110. for key in ("assistantResponseEvent","assistantMessage","message","delta","data"):
  111. if key in ev and isinstance(ev[key], dict):
  112. inner = ev[key]
  113. if isinstance(inner.get("content"), str) and inner.get("content"):
  114. return inner["content"]
  115. if isinstance(ev.get("content"), str) and ev.get("content"):
  116. return ev["content"]
  117. for list_key in ("chunks","content"):
  118. if isinstance(ev.get(list_key), list):
  119. buf = []
  120. for item in ev[list_key]:
  121. if isinstance(item, dict):
  122. if isinstance(item.get("content"), str):
  123. buf.append(item["content"])
  124. elif isinstance(item.get("text"), str):
  125. buf.append(item["text"])
  126. elif isinstance(item, str):
  127. buf.append(item)
  128. if buf:
  129. return "".join(buf)
  130. for k in ("text","delta","payload"):
  131. v = ev.get(k)
  132. if isinstance(v, str) and v:
  133. return v
  134. return None
  135. def openai_messages_to_text(messages: List[Dict[str, Any]]) -> str:
  136. lines: List[str] = []
  137. for m in messages:
  138. role = m.get("role","user")
  139. content = m.get("content","")
  140. if isinstance(content, list):
  141. parts = []
  142. for seg in content:
  143. if isinstance(seg, dict) and isinstance(seg.get("text"), str):
  144. parts.append(seg["text"])
  145. elif isinstance(seg, str):
  146. parts.append(seg)
  147. content = "\n".join(parts)
  148. elif not isinstance(content, str):
  149. content = str(content)
  150. lines.append(f"{role}:\n{content}")
  151. return "\n\n".join(lines)
  152. def inject_history(body_json: Dict[str, Any], history_text: str) -> None:
  153. try:
  154. cur = body_json["conversationState"]["currentMessage"]["userInputMessage"]
  155. content = cur.get("content","")
  156. if isinstance(content, str):
  157. cur["content"] = content.replace("你好,你必须讲个故事", history_text)
  158. except Exception:
  159. pass
  160. def inject_model(body_json: Dict[str, Any], model: Optional[str]) -> None:
  161. if not model:
  162. return
  163. try:
  164. body_json["conversationState"]["currentMessage"]["userInputMessage"]["modelId"] = model
  165. except Exception:
  166. pass
  167. def send_chat_request(access_token: str, messages: List[Dict[str, Any]], model: Optional[str] = None, stream: bool = False, timeout: Tuple[int,int] = (15,300)) -> Tuple[Optional[str], Optional[Generator[str, None, None]]]:
  168. url, headers_from_log, body_json = load_template()
  169. headers_from_log["amz-sdk-invocation-id"] = str(uuid.uuid4())
  170. try:
  171. body_json["conversationState"]["conversationId"] = str(uuid.uuid4())
  172. except Exception:
  173. pass
  174. history_text = openai_messages_to_text(messages)
  175. inject_history(body_json, history_text)
  176. inject_model(body_json, model)
  177. payload_str = json.dumps(body_json, ensure_ascii=False)
  178. headers = _merge_headers(headers_from_log, access_token)
  179. session = requests.Session()
  180. resp = session.post(url, headers=headers, data=payload_str, stream=True, timeout=timeout)
  181. if resp.status_code >= 400:
  182. try:
  183. err = resp.text
  184. except Exception:
  185. err = f"HTTP {resp.status_code}"
  186. raise requests.HTTPError(f"Upstream error {resp.status_code}: {err}", response=resp)
  187. parser = AwsEventStreamParser()
  188. def _iter_text() -> Generator[str, None, None]:
  189. for chunk in resp.iter_content(chunk_size=None):
  190. if not chunk:
  191. continue
  192. events = parser.feed(chunk)
  193. for _ev_headers, payload in events:
  194. parsed = _try_decode_event_payload(payload)
  195. if parsed is not None:
  196. text = _extract_text_from_event(parsed)
  197. if isinstance(text, str) and text:
  198. yield text
  199. else:
  200. try:
  201. txt = payload.decode("utf-8", errors="ignore")
  202. if txt:
  203. yield txt
  204. except Exception:
  205. pass
  206. if stream:
  207. return None, _iter_text()
  208. else:
  209. buf = []
  210. for t in _iter_text():
  211. buf.append(t)
  212. return "".join(buf), None