Просмотр исходного кода

fix: implement monitored stream for database export to ensure lock release

- Added a new function `createMonitoredStream` to wrap the export stream, ensuring that the backup lock is released in all scenarios: on successful completion, error, or cancellation.
- Removed the previous cleanup stream logic in favor of the new monitored stream approach for better reliability in lock management.
- Updated the runtime for the version route from edge to nodejs for consistency across the application.
ding113 2 месяцев назад
Родитель
Сommit
007d7a97e6
2 измененных файлов с 57 добавлено и 37 удалено
  1. 56 36
      src/app/api/admin/database/export/route.ts
  2. 1 1
      src/app/api/version/route.ts

+ 56 - 36
src/app/api/admin/database/export/route.ts

@@ -6,6 +6,60 @@ import { logger } from "@/lib/logger";
 // 需要数据库连接
 export const runtime = "nodejs";
 
+/**
+ * 创建监控包装流,确保在所有场景下都释放锁
+ *
+ * 关键场景:
+ * - 成功完成:通过 pull() 中的 done === true 释放锁
+ * - 流错误:通过 pull() 中的 catch 释放锁
+ * - 请求取消:通过 cancel() 释放锁
+ *
+ * @param stream - 原始流(来自 pg_dump)
+ * @param lockId - 备份锁 ID
+ * @returns 包装后的流
+ */
+function createMonitoredStream(
+  stream: ReadableStream<Uint8Array>,
+  lockId: string
+): ReadableStream<Uint8Array> {
+  const reader = stream.getReader();
+  let released = false;
+
+  const releaseLock = async () => {
+    if (!released && lockId) {
+      released = true;
+      await releaseBackupLock(lockId, "export").catch((err) => {
+        logger.error({
+          action: "database_export_lock_release_error",
+          lockId,
+          error: err.message,
+        });
+      });
+    }
+  };
+
+  return new ReadableStream({
+    async pull(controller) {
+      try {
+        const { done, value } = await reader.read();
+        if (done) {
+          controller.close();
+          await releaseLock();
+        } else {
+          controller.enqueue(value);
+        }
+      } catch (error) {
+        await releaseLock();
+        controller.error(error);
+      }
+    },
+    async cancel() {
+      await releaseLock();
+      reader.cancel();
+    },
+  });
+}
+
 /**
  * 导出数据库备份
  *
@@ -71,42 +125,8 @@ export async function GET(request: Request) {
       user: session.user.name,
     });
 
-    // 7. 监听请求取消(用户关闭浏览器)
-    request.signal.addEventListener("abort", () => {
-      if (lockId) {
-        releaseBackupLock(lockId, "export").catch((err) => {
-          logger.error({
-            action: "database_export_lock_release_error",
-            lockId,
-            reason: "request_aborted",
-            error: err.message,
-          });
-        });
-      }
-    });
-
-    // 8. 包装流以确保锁的释放
-    const cleanupStream = new TransformStream({
-      transform(chunk, controller) {
-        controller.enqueue(chunk);
-      },
-      flush() {
-        // 流正常结束时释放锁
-        if (lockId) {
-          releaseBackupLock(lockId, "export").catch((err) => {
-            logger.error({
-              action: "database_export_lock_release_error",
-              lockId,
-              reason: "stream_completed",
-              error: err.message,
-            });
-          });
-        }
-      },
-    });
-
-    // 9. 返回流式响应
-    return new Response(stream.pipeThrough(cleanupStream), {
+    // 7. 返回流式响应(使用监控包装器确保锁的释放)
+    return new Response(createMonitoredStream(stream, lockId), {
       status: 200,
       headers: {
         "Content-Type": "application/octet-stream",

+ 1 - 1
src/app/api/version/route.ts

@@ -2,7 +2,7 @@ import { NextResponse } from "next/server";
 import { logger } from "@/lib/logger";
 import { APP_VERSION, compareVersions, GITHUB_REPO } from "@/lib/version";
 
-export const runtime = "edge";
+export const runtime = "nodejs";
 export const dynamic = "force-dynamic";
 
 interface GitHubRelease {