소스 검색

refactor(feed): Unify feed endpoints and enable async verification

- Merged /v2/accounts/feed and /v2/accounts/feed_batch into a single, unified endpoint that accepts a list of accounts.
- The new endpoint immediately saves incoming accounts with nabled=0 and returns a response, preventing client timeouts.
- A background async task (_verify_and_enable_accounts) is now created to handle the time-consuming verification process for each account.
- The background task updates the account status to nabled=1 upon successful verification or records a failure reason.
- Updated the account-feeder service's backend to call the new unified endpoint.
- Updated the account-feeder service's frontend to correctly display the new asynchronous processing status to the user.

This change significantly improves the API response time for the feed endpoints and simplifies the overall codebase.
CassiopeiaCode 2 달 전
부모
커밋
29fe1fd5cf
3개의 변경된 파일96개의 추가작업 그리고 65개의 파일을 삭제
  1. 24 20
      account-feeder/app.py
  2. 6 6
      account-feeder/index.html
  3. 66 39
      app.py

+ 24 - 20
account-feeder/app.py

@@ -2,6 +2,7 @@
 Amazon Q 账号投喂服务
 用于让其他人通过 URL 登录投喂账号到主服务
 """
+
 import json
 import asyncio
 import uuid
@@ -134,6 +135,7 @@ async def poll_for_tokens(
     }
 
     import time
+
     now = time.time()
     upstream_deadline = now + max(1, int(expires_in))
     cap_deadline = now + max_timeout_sec if max_timeout_sec > 0 else upstream_deadline
@@ -189,7 +191,7 @@ async def auth_start(body: Optional[AuthStartRequest] = None):
         "expiresIn": device_data["expiresIn"],
         "label": body.label if body else None,
         "enabled": body.enabled if body else True,
-        "status": "pending"
+        "status": "pending",
     }
 
     return {
@@ -197,7 +199,7 @@ async def auth_start(body: Optional[AuthStartRequest] = None):
         "verificationUriComplete": device_data["verificationUriComplete"],
         "userCode": device_data["userCode"],
         "expiresIn": device_data["expiresIn"],
-        "interval": device_data["interval"]
+        "interval": device_data["interval"],
     }
 
 
@@ -220,7 +222,7 @@ async def auth_claim(auth_id: str):
             device_code=session["deviceCode"],
             interval=session["interval"],
             expires_in=session["expiresIn"],
-            max_timeout_sec=300
+            max_timeout_sec=300,
         )
 
         # 调用原服务创建账号
@@ -230,14 +232,14 @@ async def auth_claim(auth_id: str):
             "clientSecret": session["clientSecret"],
             "refreshToken": tokens.get("refreshToken"),
             "accessToken": tokens.get("accessToken"),
-            "enabled": False
+            "enabled": False,
         }
 
         async with httpx.AsyncClient(timeout=30.0) as client:
             r = await client.post(
                 f"{API_SERVER}/v2/accounts",
                 json=account_data,
-                headers={"content-type": "application/json"}
+                headers={"content-type": "application/json"},
             )
             r.raise_for_status()
             account = r.json()
@@ -245,22 +247,22 @@ async def auth_claim(auth_id: str):
         # 更新会话状态
         session["status"] = "completed"
 
-        return {
-            "status": "completed",
-            "account": account
-        }
+        return {"status": "completed", "account": account}
 
     except TimeoutError as e:
         raise HTTPException(status_code=408, detail=str(e))
     except httpx.HTTPStatusError as e:
-        raise HTTPException(status_code=e.response.status_code, detail=f"创建账号失败: {e.response.text}")
+        raise HTTPException(
+            status_code=e.response.status_code,
+            detail=f"创建账号失败: {e.response.text}",
+        )
     except Exception as e:
         raise HTTPException(status_code=500, detail=f"未知错误: {str(e)}")
 
 
 @app.post("/accounts/create")
 async def create_account(account: AccountCreate):
-    """创建单个账号(调用原服务)"""
+    """创建单个账号(调用主服务统一feed接口)"""
     try:
         account_data = {
             "label": account.label or "手动投喂账号",
@@ -268,14 +270,16 @@ async def create_account(account: AccountCreate):
             "clientSecret": account.clientSecret,
             "refreshToken": account.refreshToken,
             "accessToken": account.accessToken,
-            "enabled": False
         }
+        
+        # 包装成列表以调用新的批量接口
+        batch_request = {"accounts": [account_data]}
 
         async with httpx.AsyncClient(timeout=30.0) as client:
             r = await client.post(
-                f"{API_SERVER}/v2/accounts",
-                json=account_data,
-                headers={"content-type": "application/json"}
+                f"{API_SERVER}/v2/accounts/feed",
+                json=batch_request,
+                headers={"content-type": "application/json"},
             )
             r.raise_for_status()
             return r.json()
@@ -283,7 +287,7 @@ async def create_account(account: AccountCreate):
     except httpx.HTTPStatusError as e:
         raise HTTPException(
             status_code=e.response.status_code,
-            detail=f"创建账号失败: {e.response.text}"
+            detail=f"创建账号失败: {e.response.text}",
         )
     except Exception as e:
         raise HTTPException(status_code=500, detail=f"未知错误: {str(e)}")
@@ -291,20 +295,20 @@ async def create_account(account: AccountCreate):
 
 @app.post("/accounts/batch")
 async def batch_create_accounts(request: BatchCreateRequest):
-    """批量创建账号(调用主服务批量接口)"""
+    """批量创建账号(调用主服务统一feed接口)"""
     try:
         async with httpx.AsyncClient(timeout=60.0) as client:
             r = await client.post(
-                f"{API_SERVER}/v2/accounts/batch",
+                f"{API_SERVER}/v2/accounts/feed",
                 json={"accounts": request.accounts},
-                headers={"content-type": "application/json"}
+                headers={"content-type": "application/json"},
             )
             r.raise_for_status()
             return r.json()
     except httpx.HTTPStatusError as e:
         raise HTTPException(
             status_code=e.response.status_code,
-            detail=f"批量创建失败: {e.response.text}"
+            detail=f"批量创建失败: {e.response.text}",
         )
     except Exception as e:
         raise HTTPException(status_code=500, detail=f"未知错误: {str(e)}")

+ 6 - 6
account-feeder/index.html

@@ -324,7 +324,9 @@ async function createSingle(){
     });
     if (!r.ok) throw new Error(await r.text());
     const j = await r.json();
-    result.textContent = '🎉 账号创建成功!\n\n' + JSON.stringify(j, null, 2);
+    result.textContent = '✅ 请求已提交!\n\n' +
+      '主服务正在后台异步验证账号,请稍后在主服务后台查看最终状态。\n\n' +
+      'API 响应:\n' + JSON.stringify(j, null, 2);
 
     // 清空表单
     document.getElementById('manual_label').value = '';
@@ -365,11 +367,9 @@ async function createBatch(){
     if (!r.ok) throw new Error(await r.text());
     const j = await r.json();
 
-    result.textContent = `✅ 批量创建完成!\n\n` +
-      `成功:${j.success} 个\n` +
-      `失败:${j.failed} 个\n` +
-      `总计:${j.total} 个\n\n` +
-      `详细结果:\n${JSON.stringify(j.results, null, 2)}`;
+    result.textContent = '✅ 请求已提交!\n\n' +
+      `主服务正在后台异步验证 ${accounts.length} 个账号,请稍后在主服务后台查看最终状态。\n\n` +
+      'API 响应:\n' + JSON.stringify(j, null, 2);
   } catch(e){
     result.textContent = '❌ 批量创建失败:' + e.message;
   }

+ 66 - 39
app.py

@@ -1055,6 +1055,7 @@ if CONSOLE_ENABLED:
         now = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime())
         acc_id = str(uuid.uuid4())
         other_str = json.dumps(body.other, ensure_ascii=False) if body.other is not None else None
+        enabled_val = 1 if (body.enabled is None or body.enabled) else 0
         async with _conn() as conn:
             conn.row_factory = aiosqlite.Row
             await conn.execute(
@@ -1074,57 +1075,83 @@ if CONSOLE_ENABLED:
                     "never",
                     now,
                     now,
-                    0,
+                    enabled_val,
                 ),
             )
             await conn.commit()
-            async with conn.execute("SELECT * FROM accounts WHERE id=?", (acc_id,)) as cursor:
-                row = await cursor.fetchone()
-                account = _row_to_dict(row)
-        
-        verify_success, fail_reason = await verify_account(account)
-        async with _conn() as conn:
-            if verify_success:
-                await conn.execute("UPDATE accounts SET enabled=1, updated_at=? WHERE id=?", (now, acc_id))
-            elif fail_reason:
-                other_dict = json.loads(other_str) if other_str else {}
-                other_dict['failedReason'] = fail_reason
-                await conn.execute("UPDATE accounts SET other=?, updated_at=? WHERE id=?", (json.dumps(other_dict, ensure_ascii=False), now, acc_id))
-            await conn.commit()
-            conn.row_factory = aiosqlite.Row
             async with conn.execute("SELECT * FROM accounts WHERE id=?", (acc_id,)) as cursor:
                 row = await cursor.fetchone()
                 return _row_to_dict(row)
 
-    @app.post("/v2/accounts/batch")
-    async def batch_create_accounts(request: BatchAccountCreate):
-        results = []
-        success_count = 0
-        failed_count = 0
-        for i, account_data in enumerate(request.accounts):
+
+    async def _verify_and_enable_accounts(account_ids: List[str]):
+        """后台异步验证并启用账号"""
+        for acc_id in account_ids:
             try:
+                # 必须先获取完整的账号信息
+                account = await get_account(acc_id)
+                verify_success, fail_reason = await verify_account(account)
                 now = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime())
-                acc_id = str(uuid.uuid4())
-                other_str = json.dumps(account_data.other, ensure_ascii=False) if account_data.other else None
+                
                 async with _conn() as conn:
-                    await conn.execute(
-                        """
-                        INSERT INTO accounts (id, label, clientId, clientSecret, refreshToken, accessToken, other, last_refresh_time, last_refresh_status, created_at, updated_at, enabled)
-                        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
-                        """,
-                        (acc_id, account_data.label or f"批量账号 {i+1}", account_data.clientId, account_data.clientSecret, account_data.refreshToken, account_data.accessToken, other_str, None, "never", now, now, 0),
-                    )
+                    if verify_success:
+                        await conn.execute("UPDATE accounts SET enabled=1, updated_at=? WHERE id=?", (now, acc_id))
+                    elif fail_reason:
+                        other_dict = account.get("other", {})
+                        other_dict['failedReason'] = fail_reason
+                        await conn.execute("UPDATE accounts SET other=?, updated_at=? WHERE id=?", (json.dumps(other_dict, ensure_ascii=False), now, acc_id))
                     await conn.commit()
-                    conn.row_factory = aiosqlite.Row
-                    async with conn.execute("SELECT * FROM accounts WHERE id=?", (acc_id,)) as cursor:
-                        row = await cursor.fetchone()
-                        account = _row_to_dict(row)
-                results.append({"index": i, "status": "success", "account": account})
-                success_count += 1
             except Exception as e:
-                results.append({"index": i, "status": "failed", "error": str(e)})
-                failed_count += 1
-        return {"total": len(request.accounts), "success": success_count, "failed": failed_count, "results": results}
+                print(f"Error verifying account {acc_id}: {e}")
+                traceback.print_exc()
+
+    @app.post("/v2/accounts/feed")
+    async def create_accounts_feed(request: BatchAccountCreate):
+        """
+        统一的投喂接口,接收账号列表,立即存入并后台异步验证。
+        """
+        now = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime())
+        new_account_ids = []
+        
+        async with _conn() as conn:
+            for i, account_data in enumerate(request.accounts):
+                acc_id = str(uuid.uuid4())
+                other_dict = account_data.other or {}
+                other_dict['source'] = 'feed'
+                other_str = json.dumps(other_dict, ensure_ascii=False)
+                
+                await conn.execute(
+                    """
+                    INSERT INTO accounts (id, label, clientId, clientSecret, refreshToken, accessToken, other, last_refresh_time, last_refresh_status, created_at, updated_at, enabled)
+                    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
+                    """,
+                    (
+                        acc_id,
+                        account_data.label or f"批量账号 {i+1}",
+                        account_data.clientId,
+                        account_data.clientSecret,
+                        account_data.refreshToken,
+                        account_data.accessToken,
+                        other_str,
+                        None,
+                        "never",
+                        now,
+                        now,
+                        0,  # 初始为禁用状态
+                    ),
+                )
+                new_account_ids.append(acc_id)
+            await conn.commit()
+
+        # 启动后台任务进行验证,不阻塞当前请求
+        if new_account_ids:
+            asyncio.create_task(_verify_and_enable_accounts(new_account_ids))
+
+        return {
+            "status": "processing",
+            "message": f"{len(new_account_ids)} accounts received and are being verified in the background.",
+            "account_ids": new_account_ids
+        }
 
     @app.get("/v2/accounts")
     async def list_accounts():