app.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341
  1. """
  2. Amazon Q 账号投喂服务
  3. 用于让其他人通过 URL 登录投喂账号到主服务
  4. """
  5. import json
  6. import asyncio
  7. import uuid
  8. import os
  9. from typing import Dict, Optional
  10. from pathlib import Path
  11. import httpx
  12. import uvicorn
  13. from fastapi import FastAPI, HTTPException
  14. from fastapi.responses import HTMLResponse
  15. from pydantic import BaseModel
  16. from dotenv import load_dotenv
  17. # 加载环境变量
  18. load_dotenv()
  19. # 配置
  20. PORT = int(os.getenv("FEEDER_PORT", "8001"))
  21. API_SERVER = os.getenv("API_SERVER", "http://localhost:8000")
  22. API_SERVER_PASSWORD = os.getenv("API_SERVER_PASSWORD")
  23. # OIDC 端点
  24. OIDC_BASE = "https://oidc.us-east-1.amazonaws.com"
  25. REGISTER_URL = f"{OIDC_BASE}/client/register"
  26. DEVICE_AUTH_URL = f"{OIDC_BASE}/device_authorization"
  27. TOKEN_URL = f"{OIDC_BASE}/token"
  28. START_URL = "https://view.awsapps.com/start"
  29. USER_AGENT = "aws-sdk-rust/1.3.9 os/windows lang/rust/1.87.0"
  30. X_AMZ_USER_AGENT = "aws-sdk-rust/1.3.9 ua/2.1 api/ssooidc/1.88.0 os/windows lang/rust/1.87.0 m/E app/AmazonQ-For-CLI"
  31. AMZ_SDK_REQUEST = "attempt=1; max=3"
  32. # 内存存储授权会话
  33. AUTH_SESSIONS = {}
  34. app = FastAPI(title="Amazon Q 账号投喂服务")
  35. # ============ 数据模型 ============
  36. class AuthStartRequest(BaseModel):
  37. label: Optional[str] = None
  38. enabled: bool = True
  39. class AccountCreate(BaseModel):
  40. label: Optional[str] = None
  41. clientId: str
  42. clientSecret: str
  43. refreshToken: str
  44. accessToken: Optional[str] = None
  45. enabled: bool = True
  46. class BatchCreateRequest(BaseModel):
  47. accounts: list[dict]
  48. # ============ OIDC 授权函数 ============
  49. def _get_proxies() -> Optional[Dict[str, str]]:
  50. """获取代理配置"""
  51. proxy = os.getenv("HTTP_PROXY", "").strip()
  52. if proxy:
  53. return {"http://": proxy, "https://": proxy}
  54. return None
  55. def make_headers() -> Dict[str, str]:
  56. """生成 OIDC 请求头"""
  57. return {
  58. "content-type": "application/json",
  59. "user-agent": USER_AGENT,
  60. "x-amz-user-agent": X_AMZ_USER_AGENT,
  61. "amz-sdk-request": AMZ_SDK_REQUEST,
  62. "amz-sdk-invocation-id": str(uuid.uuid4()),
  63. }
  64. async def post_json(url: str, payload: Dict) -> httpx.Response:
  65. """发送 JSON POST 请求"""
  66. payload_str = json.dumps(payload, ensure_ascii=False)
  67. headers = make_headers()
  68. async with httpx.AsyncClient(proxies=_get_proxies(), timeout=60.0) as client:
  69. resp = await client.post(url, headers=headers, content=payload_str)
  70. return resp
  71. async def register_client() -> tuple[str, str]:
  72. """注册 OIDC 客户端"""
  73. payload = {
  74. "clientName": "Amazon Q Developer for command line",
  75. "clientType": "public",
  76. "scopes": [
  77. "codewhisperer:completions",
  78. "codewhisperer:analysis",
  79. "codewhisperer:conversations",
  80. ],
  81. }
  82. r = await post_json(REGISTER_URL, payload)
  83. r.raise_for_status()
  84. data = r.json()
  85. return data["clientId"], data["clientSecret"]
  86. async def start_device_authorization(client_id: str, client_secret: str) -> Dict:
  87. """开始设备授权流程"""
  88. payload = {
  89. "clientId": client_id,
  90. "clientSecret": client_secret,
  91. "startUrl": START_URL,
  92. }
  93. r = await post_json(DEVICE_AUTH_URL, payload)
  94. r.raise_for_status()
  95. return r.json()
  96. async def poll_for_tokens(
  97. client_id: str,
  98. client_secret: str,
  99. device_code: str,
  100. interval: int,
  101. expires_in: int,
  102. max_timeout_sec: int = 300,
  103. ) -> Dict:
  104. """轮询获取 tokens"""
  105. payload = {
  106. "clientId": client_id,
  107. "clientSecret": client_secret,
  108. "deviceCode": device_code,
  109. "grantType": "urn:ietf:params:oauth:grant-type:device_code",
  110. }
  111. import time
  112. now = time.time()
  113. upstream_deadline = now + max(1, int(expires_in))
  114. cap_deadline = now + max_timeout_sec if max_timeout_sec > 0 else upstream_deadline
  115. deadline = min(upstream_deadline, cap_deadline)
  116. poll_interval = max(1, int(interval or 1))
  117. while time.time() < deadline:
  118. r = await post_json(TOKEN_URL, payload)
  119. if r.status_code == 200:
  120. return r.json()
  121. if r.status_code == 400:
  122. try:
  123. err = r.json()
  124. except Exception:
  125. err = {"error": r.text}
  126. if str(err.get("error")) == "authorization_pending":
  127. await asyncio.sleep(poll_interval)
  128. continue
  129. r.raise_for_status()
  130. r.raise_for_status()
  131. raise TimeoutError("设备授权超时(5分钟内未完成授权)")
  132. # ============ API 端点 ============
  133. @app.get("/", response_class=HTMLResponse)
  134. async def index():
  135. """返回前端页面"""
  136. html_path = Path(__file__).parent / "index.html"
  137. if not html_path.exists():
  138. return HTMLResponse("<h1>index.html 未找到</h1>", status_code=404)
  139. return HTMLResponse(html_path.read_text(encoding="utf-8"))
  140. @app.post("/auth/start")
  141. async def auth_start(body: Optional[AuthStartRequest] = None):
  142. """开始设备授权流程"""
  143. # 注册客户端
  144. client_id, client_secret = await register_client()
  145. # 开始设备授权
  146. device_data = await start_device_authorization(client_id, client_secret)
  147. # 生成会话 ID
  148. auth_id = str(uuid.uuid4())
  149. # 存储会话信息
  150. AUTH_SESSIONS[auth_id] = {
  151. "clientId": client_id,
  152. "clientSecret": client_secret,
  153. "deviceCode": device_data["deviceCode"],
  154. "interval": device_data["interval"],
  155. "expiresIn": device_data["expiresIn"],
  156. "label": body.label if body else None,
  157. "enabled": body.enabled if body else True,
  158. "status": "pending",
  159. }
  160. return {
  161. "authId": auth_id,
  162. "verificationUriComplete": device_data["verificationUriComplete"],
  163. "userCode": device_data["userCode"],
  164. "expiresIn": device_data["expiresIn"],
  165. "interval": device_data["interval"],
  166. }
  167. @app.post("/auth/claim/{auth_id}")
  168. async def auth_claim(auth_id: str):
  169. """轮询并创建账号(调用原服务)"""
  170. if auth_id not in AUTH_SESSIONS:
  171. raise HTTPException(status_code=404, detail="授权会话不存在")
  172. session = AUTH_SESSIONS[auth_id]
  173. if session["status"] == "completed":
  174. raise HTTPException(status_code=400, detail="授权已完成")
  175. try:
  176. # 轮询获取 tokens
  177. tokens = await poll_for_tokens(
  178. client_id=session["clientId"],
  179. client_secret=session["clientSecret"],
  180. device_code=session["deviceCode"],
  181. interval=session["interval"],
  182. expires_in=session["expiresIn"],
  183. max_timeout_sec=300,
  184. )
  185. # 调用原服务创建账号
  186. account_data = {
  187. "label": session.get("label") or f"投喂账号 {auth_id[:8]}",
  188. "clientId": session["clientId"],
  189. "clientSecret": session["clientSecret"],
  190. "refreshToken": tokens.get("refreshToken"),
  191. "accessToken": tokens.get("accessToken"),
  192. "enabled": False,
  193. }
  194. headers = {"content-type": "application/json"}
  195. if API_SERVER_PASSWORD:
  196. headers["Authorization"] = f"Bearer {API_SERVER_PASSWORD}"
  197. async with httpx.AsyncClient(timeout=30.0) as client:
  198. r = await client.post(
  199. f"{API_SERVER}/v2/accounts",
  200. json=account_data,
  201. headers=headers,
  202. )
  203. r.raise_for_status()
  204. account = r.json()
  205. # 更新会话状态
  206. session["status"] = "completed"
  207. return {"status": "completed", "account": account}
  208. except TimeoutError as e:
  209. raise HTTPException(status_code=408, detail=str(e))
  210. except httpx.HTTPStatusError as e:
  211. raise HTTPException(
  212. status_code=e.response.status_code,
  213. detail=f"创建账号失败: {e.response.text}",
  214. )
  215. except Exception as e:
  216. raise HTTPException(status_code=500, detail=f"未知错误: {str(e)}")
  217. @app.post("/accounts/create")
  218. async def create_account(account: AccountCreate):
  219. """创建单个账号(调用主服务统一feed接口)"""
  220. try:
  221. account_data = {
  222. "label": account.label or "手动投喂账号",
  223. "clientId": account.clientId,
  224. "clientSecret": account.clientSecret,
  225. "refreshToken": account.refreshToken,
  226. "accessToken": account.accessToken,
  227. }
  228. # 包装成列表以调用新的批量接口
  229. batch_request = {"accounts": [account_data]}
  230. headers = {"content-type": "application/json"}
  231. if API_SERVER_PASSWORD:
  232. headers["Authorization"] = f"Bearer {API_SERVER_PASSWORD}"
  233. async with httpx.AsyncClient(timeout=30.0) as client:
  234. r = await client.post(
  235. f"{API_SERVER}/v2/accounts/feed",
  236. json=batch_request,
  237. headers=headers,
  238. )
  239. r.raise_for_status()
  240. return r.json()
  241. except httpx.HTTPStatusError as e:
  242. raise HTTPException(
  243. status_code=e.response.status_code,
  244. detail=f"创建账号失败: {e.response.text}",
  245. )
  246. except Exception as e:
  247. raise HTTPException(status_code=500, detail=f"未知错误: {str(e)}")
  248. @app.post("/accounts/batch")
  249. async def batch_create_accounts(request: BatchCreateRequest):
  250. """批量创建账号(调用主服务统一feed接口)"""
  251. try:
  252. headers = {"content-type": "application/json"}
  253. if API_SERVER_PASSWORD:
  254. headers["Authorization"] = f"Bearer {API_SERVER_PASSWORD}"
  255. async with httpx.AsyncClient(timeout=60.0) as client:
  256. r = await client.post(
  257. f"{API_SERVER}/v2/accounts/feed",
  258. json={"accounts": request.accounts},
  259. headers=headers,
  260. )
  261. r.raise_for_status()
  262. return r.json()
  263. except httpx.HTTPStatusError as e:
  264. raise HTTPException(
  265. status_code=e.response.status_code,
  266. detail=f"批量创建失败: {e.response.text}",
  267. )
  268. except Exception as e:
  269. raise HTTPException(status_code=500, detail=f"未知错误: {str(e)}")
  270. @app.get("/health")
  271. async def health():
  272. """健康检查"""
  273. return {"status": "ok", "service": "amazonq-account-feeder"}
  274. if __name__ == "__main__":
  275. print(f"🚀 Amazon Q 账号投喂服务启动中...")
  276. print(f"📍 监听端口: {PORT}")
  277. print(f"🔗 主服务地址: {API_SERVER}")
  278. print(f"🌐 访问地址: http://localhost:{PORT}")
  279. uvicorn.run(app, host="0.0.0.0", port=PORT)