Selaa lähdekoodia

fix: enhance database import and export routes for better lock management

- Refactored the database export and import routes to improve the handling of distributed locks, ensuring they are released correctly in various scenarios, including connection failures and request cancellations.
- Updated the logic to validate form data before acquiring locks, preventing unnecessary lock contention.
- Improved error handling and logging for lock release failures, enhancing the reliability of the backup operations.
ding113 1 kuukausi sitten
vanhempi
sitoutus
519c56325c

+ 5 - 4
deploy/Dockerfile

@@ -38,12 +38,13 @@ WORKDIR /app
 # node:trixie-slim 基于 Debian Trixie,默认只有 PostgreSQL 17.x
 # 使用 PostgreSQL 官方仓库安装 18.x 以匹配 docker-compose 中的 postgres:18
 RUN apt-get update && \
-    apt-get install -y curl ca-certificates gnupg && \
+    apt-get install -y --no-install-recommends curl ca-certificates gnupg && \
     curl -fsSL https://www.postgresql.org/media/keys/ACCC4CF8.asc | gpg --dearmor -o /etc/apt/keyrings/pgdg.gpg && \
-    echo "deb [signed-by=/etc/apt/keyrings/pgdg.gpg] http://apt.postgresql.org/pub/repos/apt trixie-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \
+    echo "deb [signed-by=/etc/apt/keyrings/pgdg.gpg] https://apt.postgresql.org/pub/repos/apt trixie-pgdg main" > /etc/apt/sources.list.d/pgdg.list && \
     apt-get update && \
-    apt-get install -y postgresql-client-18 && \
-    rm -rf /var/lib/apt/lists/*
+    apt-get install -y --no-install-recommends postgresql-client-18 && \
+    apt-get purge -y --auto-remove gnupg && \
+    rm -rf /var/lib/apt/lists/* /tmp/* /var/tmp/*
 
 COPY --from=build --chown=node:node /app/public ./public
 COPY --from=build --chown=node:node /app/drizzle ./drizzle

+ 40 - 14
src/app/api/admin/database/export/route.ts

@@ -24,38 +24,53 @@ function createMonitoredStream(
 ): ReadableStream<Uint8Array> {
   const reader = stream.getReader();
   let released = false;
+  let cancelled = false;
 
-  const releaseLock = async () => {
-    if (!released && lockId) {
-      released = true;
-      await releaseBackupLock(lockId, "export").catch((err) => {
-        logger.error({
-          action: "database_export_lock_release_error",
-          lockId,
-          error: err.message,
-        });
+  const releaseLock = async (reason?: string) => {
+    if (released || !lockId) return;
+    released = true; // 同步设置,在任何 await 之前
+    await releaseBackupLock(lockId, "export").catch((err) => {
+      logger.error({
+        action: "database_export_lock_release_error",
+        lockId,
+        reason,
+        error: err.message,
       });
-    }
+    });
   };
 
   return new ReadableStream({
     async pull(controller) {
+      // 如果已取消,不再读取
+      if (cancelled) {
+        controller.close();
+        return;
+      }
+
       try {
         const { done, value } = await reader.read();
         if (done) {
           controller.close();
-          await releaseLock();
+          await releaseLock("stream_done");
         } else {
           controller.enqueue(value);
         }
       } catch (error) {
-        await releaseLock();
+        await releaseLock("stream_error");
+        reader.releaseLock();
         controller.error(error);
       }
     },
     async cancel() {
-      await releaseLock();
-      reader.cancel();
+      cancelled = true;
+      await releaseLock("request_cancelled");
+      await reader.cancel().catch((err) => {
+        logger.error({
+          action: "database_export_reader_cancel_error",
+          lockId,
+          error: err instanceof Error ? err.message : String(err),
+        });
+      });
     },
   });
 }
@@ -103,6 +118,17 @@ export async function GET(request: Request) {
       logger.error({
         action: "database_export_connection_unavailable",
       });
+      // 数据库不可用时释放锁
+      if (lockId) {
+        await releaseBackupLock(lockId, "export").catch((err) => {
+          logger.error({
+            action: "database_export_lock_release_error",
+            lockId,
+            reason: "connection_unavailable",
+            error: err.message,
+          });
+        });
+      }
       return Response.json({ error: "数据库连接不可用,请检查数据库服务状态" }, { status: 503 });
     }
 

+ 74 - 37
src/app/api/admin/database/import/route.ts

@@ -4,7 +4,6 @@ import { acquireBackupLock, releaseBackupLock } from "@/lib/database-backup/back
 import { checkDatabaseConnection, executePgRestore } from "@/lib/database-backup/docker-executor";
 import {
   cleanupTempFile,
-  createCleanupCallback,
   generateTempFilePath,
   registerTempFile,
 } from "@/lib/database-backup/temp-file-manager";
@@ -40,32 +39,7 @@ export async function POST(request: Request) {
       return new Response("Unauthorized", { status: 401 });
     }
 
-    // 2. 尝试获取分布式锁(防止并发操作)
-    lockId = await acquireBackupLock("import");
-    if (!lockId) {
-      logger.warn({
-        action: "database_import_lock_conflict",
-        user: session.user.name,
-      });
-      return Response.json(
-        {
-          error: "其他管理员正在执行备份操作,请稍后重试",
-          details: "为确保数据一致性,同一时间只能执行一个备份操作",
-        },
-        { status: 409 }
-      );
-    }
-
-    // 3. 检查数据库连接
-    const isAvailable = await checkDatabaseConnection();
-    if (!isAvailable) {
-      logger.error({
-        action: "database_import_connection_unavailable",
-      });
-      return Response.json({ error: "数据库连接不可用,请检查数据库服务状态" }, { status: 503 });
-    }
-
-    // 4. 解析表单数据
+    // 2. 解析并验证表单数据(在获取锁之前完成,避免验证失败时锁被占用)
     const formData = await request.formData();
     const file = formData.get("file") as File | null;
     const cleanFirst = formData.get("cleanFirst") === "true";
@@ -75,12 +49,12 @@ export async function POST(request: Request) {
       return Response.json({ error: "缺少备份文件" }, { status: 400 });
     }
 
-    // 5. 验证文件类型
+    // 3. 验证文件类型
     if (!file.name.endsWith(".dump")) {
       return Response.json({ error: "文件格式错误,仅支持 .dump 格式的备份文件" }, { status: 400 });
     }
 
-    // 6. 验证文件大小
+    // 4. 验证文件大小
     if (file.size > MAX_FILE_SIZE) {
       logger.warn({
         action: "database_import_file_too_large",
@@ -97,6 +71,42 @@ export async function POST(request: Request) {
       );
     }
 
+    // 5. 尝试获取分布式锁(防止并发操作)
+    lockId = await acquireBackupLock("import");
+    if (!lockId) {
+      logger.warn({
+        action: "database_import_lock_conflict",
+        user: session.user.name,
+      });
+      return Response.json(
+        {
+          error: "其他管理员正在执行备份操作,请稍后重试",
+          details: "为确保数据一致性,同一时间只能执行一个备份操作",
+        },
+        { status: 409 }
+      );
+    }
+
+    // 6. 检查数据库连接
+    const isAvailable = await checkDatabaseConnection();
+    if (!isAvailable) {
+      logger.error({
+        action: "database_import_connection_unavailable",
+      });
+      // 数据库不可用时释放锁
+      if (lockId) {
+        await releaseBackupLock(lockId, "import").catch((err) => {
+          logger.error({
+            action: "database_import_lock_release_error",
+            lockId,
+            reason: "connection_unavailable",
+            error: err.message,
+          });
+        });
+      }
+      return Response.json({ error: "数据库连接不可用,请检查数据库服务状态" }, { status: 503 });
+    }
+
     logger.info({
       action: "database_import_initiated",
       filename: file.name,
@@ -120,8 +130,34 @@ export async function POST(request: Request) {
     });
 
     // 8. 监听请求取消(用户关闭浏览器)
-    const abortCleanup = createCleanupCallback(tempFilePath, "aborted");
-    request.signal.addEventListener("abort", abortCleanup);
+    // 创建一个综合的清理函数,同时处理临时文件和锁
+    const currentLockId = lockId;
+    const currentTempFilePath = tempFilePath;
+    const abortHandler = () => {
+      // 清理临时文件
+      if (currentTempFilePath) {
+        cleanupTempFile(currentTempFilePath, "aborted").catch((err) => {
+          logger.error({
+            action: "database_import_cleanup_error",
+            tempFilePath: currentTempFilePath,
+            reason: "aborted",
+            error: err instanceof Error ? err.message : String(err),
+          });
+        });
+      }
+      // 释放锁
+      if (currentLockId) {
+        releaseBackupLock(currentLockId, "import").catch((err) => {
+          logger.error({
+            action: "database_import_lock_release_error",
+            lockId: currentLockId,
+            reason: "request_cancelled",
+            error: err.message,
+          });
+        });
+      }
+    };
+    request.signal.addEventListener("abort", abortHandler);
 
     // 9. 执行 pg_restore,返回 SSE 流
     const stream = executePgRestore(tempFilePath, cleanFirst, skipLogs);
@@ -133,21 +169,22 @@ export async function POST(request: Request) {
       },
       flush() {
         // 流正常结束时清理
-        if (tempFilePath) {
-          cleanupTempFile(tempFilePath, "completed").catch((err) => {
+        if (currentTempFilePath) {
+          cleanupTempFile(currentTempFilePath, "completed").catch((err) => {
             logger.error({
               action: "database_import_cleanup_error",
-              tempFilePath,
+              tempFilePath: currentTempFilePath,
               error: err.message,
             });
           });
         }
 
-        if (lockId) {
-          releaseBackupLock(lockId, "import").catch((err) => {
+        if (currentLockId) {
+          releaseBackupLock(currentLockId, "import").catch((err) => {
             logger.error({
               action: "database_import_lock_release_error",
-              lockId,
+              lockId: currentLockId,
+              reason: "stream_done",
               error: err.message,
             });
           });