0xReLogic пре 3 дана
родитељ
комит
0d7e53e8d7
3 измењених фајлова са 160 додато и 49 уклоњено
  1. 146 41
      src/database.py
  2. 14 5
      src/main.py
  3. 0 3
      src/memory.py

+ 146 - 41
src/database.py

@@ -1,7 +1,9 @@
 """Database management for Cognio."""
 
+import hashlib
 import json
 import logging
+import re
 import sqlite3
 from pathlib import Path
 from typing import Any
@@ -17,6 +19,8 @@ logger = logging.getLogger(__name__)
 _DB_NOT_CONNECTED_ERROR = "Database not connected"
 _PROJECT_FILTER_SQL = " AND project = ?"
 _TAGS_LIKE_SQL = "tags LIKE ?"
+_LEANN_GLOBAL_KEY = "__global__"
+_LEANN_PROJECT_MAX_LEN = 64
 
 
 class Database:
@@ -27,8 +31,8 @@ class Database:
         self.db_path = db_path or settings.db_path
         self.conn: sqlite3.Connection | None = None
         self.fts_ready: bool = False
-        self.leann_engine: Any | None = None
-        self.leann_dirty: bool = False
+        self.leann_engines: dict[str, Any] = {}
+        self.leann_dirty_projects: set[str] = set()
 
     def connect(self) -> None:
         """Create database connection and initialize schema."""
@@ -236,7 +240,6 @@ class Database:
             tuple(memory_ids),
         )
         self.commit()
-        self.leann_dirty = True
 
     def engram_search_candidates(
         self, query: str, project: str | None = None, limit: int | None = None
@@ -278,15 +281,90 @@ class Database:
             logger.warning(f"Engram search failed: {e}")
             return []
 
-    def _leann_index_path(self) -> Path:
-        return Path(settings.leann_index_path).expanduser().resolve()
+    def _leann_project_key(self, project: str | None) -> str:
+        if not project:
+            return _LEANN_GLOBAL_KEY
+        slug = re.sub(r"[^a-zA-Z0-9]+", "-", project.strip().lower()).strip("-")
+        if not slug:
+            digest = hashlib.sha1(project.encode("utf-8")).hexdigest()[:12]
+            slug = f"project-{digest}"
+        return slug[:_LEANN_PROJECT_MAX_LEN]
+
+    def _leann_index_path(self, project: str | None = None) -> Path:
+        base = Path(settings.leann_index_path).expanduser().resolve()
+        if not project:
+            return base
+        key = self._leann_project_key(project)
+        if key == _LEANN_GLOBAL_KEY:
+            return base
+        suffix = base.suffix if base.suffix else ".leann"
+        stem = base.stem or "memories"
+        return base.with_name(f"{stem}_{key}{suffix}")
 
     def _leann_meta_path(self, index_path: Path) -> Path:
         return Path(f"{index_path}.meta.json")
 
-    def _cleanup_leann_engine(self) -> None:
-        engine = self.leann_engine
-        self.leann_engine = None
+    def leann_global_key(self) -> str:
+        return _LEANN_GLOBAL_KEY
+
+    def _get_project_for_memory(self, memory_id: str) -> str | None:
+        if self.conn is None:
+            raise RuntimeError(_DB_NOT_CONNECTED_ERROR)
+        cursor = self.execute("SELECT project FROM memories WHERE id = ?", (memory_id,))
+        row = cursor.fetchone()
+        return row["project"] if row else None
+
+    def list_projects(self, include_none: bool = False) -> list[str | None]:
+        if self.conn is None:
+            raise RuntimeError(_DB_NOT_CONNECTED_ERROR)
+        if include_none:
+            cursor = self.execute(
+                "SELECT DISTINCT project FROM memories WHERE archived = 0"
+            )
+        else:
+            cursor = self.execute(
+                "SELECT DISTINCT project FROM memories WHERE archived = 0 AND project IS NOT NULL"
+            )
+        return [row["project"] for row in cursor.fetchall()]
+
+    def _mark_leann_dirty(self, project: str | None) -> None:
+        key = self._leann_project_key(project)
+        self.leann_dirty_projects.add(key)
+        if project is not None:
+            self.leann_dirty_projects.add(_LEANN_GLOBAL_KEY)
+
+    def _clear_leann_dirty(self, project: str | None) -> None:
+        key = self._leann_project_key(project)
+        self.leann_dirty_projects.discard(key)
+
+    def leann_is_dirty(self, project: str | None) -> bool:
+        key = self._leann_project_key(project)
+        return key in self.leann_dirty_projects
+
+    def mark_all_leann_dirty(self) -> None:
+        projects = self.list_projects(include_none=True)
+        if not projects:
+            self.leann_dirty_projects.add(_LEANN_GLOBAL_KEY)
+            return
+        for project in projects:
+            self._mark_leann_dirty(project)
+
+    def next_leann_build_project(self) -> str | None:
+        projects = self.list_projects(include_none=True)
+        if not projects:
+            return None
+        ordered = sorted([p for p in projects if p])
+        ordered.append(_LEANN_GLOBAL_KEY)
+        for project in ordered:
+            project_value = None if project == _LEANN_GLOBAL_KEY else project
+            index_path = self._leann_index_path(project_value)
+            meta_path = self._leann_meta_path(index_path)
+            if self.leann_is_dirty(project_value) or not meta_path.exists():
+                return project
+        return None
+
+    def _cleanup_leann_engine_key(self, key: str) -> None:
+        engine = self.leann_engines.pop(key, None)
         if engine is None:
             return
         cleanup = getattr(engine, "cleanup", None)
@@ -296,7 +374,15 @@ class Database:
             except Exception as e:
                 logger.warning(f"LEANN cleanup failed: {e}")
 
-    def _load_leann_engine(self) -> bool:
+    def _cleanup_leann_engine(self, project: str | None = None) -> None:
+        if project is None:
+            for key in list(self.leann_engines.keys()):
+                self._cleanup_leann_engine_key(key)
+            return
+        key = self._leann_project_key(project)
+        self._cleanup_leann_engine_key(key)
+
+    def _load_leann_engine(self, project: str | None) -> bool:
         if not settings.leann_enabled:
             return False
         try:
@@ -305,27 +391,28 @@ class Database:
             logger.warning(f"LEANN not available: {e}")
             return False
 
-        index_path = self._leann_index_path()
+        index_path = self._leann_index_path(project)
         meta_path = self._leann_meta_path(index_path)
         if not meta_path.exists():
             logger.info("LEANN index not found: %s", meta_path)
             return False
 
         try:
-            self.leann_engine = LeannSearcher(
+            key = self._leann_project_key(project)
+            self.leann_engines[key] = LeannSearcher(
                 str(index_path),
                 enable_warmup=bool(settings.leann_warmup_on_start),
                 recompute_embeddings=bool(settings.leann_recompute_on_search),
             )
-            self.leann_dirty = False
+            self._clear_leann_dirty(project)
             logger.info("LEANN searcher loaded: %s", index_path)
             return True
         except Exception as e:
             logger.warning(f"LEANN searcher load failed: {e}")
-            self.leann_engine = None
+            self.leann_engines.pop(self._leann_project_key(project), None)
             return False
 
-    def build_leann_index(self) -> bool:
+    def build_leann_index(self, project: str | None = None) -> bool:
         if not settings.leann_enabled:
             return False
         if self.conn is None:
@@ -356,10 +443,15 @@ class Database:
             **backend_kwargs,
         )
 
-        cursor = self.execute(
+        sql = (
             "SELECT id, text, embedding, project FROM memories "
             "WHERE archived = 0 AND embedding IS NOT NULL"
         )
+        params: list[Any] = []
+        if project:
+            sql += _PROJECT_FILTER_SQL
+            params.append(project)
+        cursor = self.execute(sql, tuple(params))
         rows = cursor.fetchall()
         ids: list[str] = []
         embeddings: list[list[float]] = []
@@ -378,7 +470,10 @@ class Database:
                 builder.add_text(row["text"] or "", metadata=metadata)
 
         if not ids:
-            logger.info("LEANN build skipped: no embeddings available")
+            logger.info(
+                "LEANN build skipped: no embeddings available (project=%s)",
+                project or "global",
+            )
             return False
 
         import numpy as np
@@ -386,7 +481,7 @@ class Database:
         import pickle
         import tempfile
 
-        index_path = self._leann_index_path()
+        index_path = self._leann_index_path(project)
         index_path.parent.mkdir(parents=True, exist_ok=True)
 
         embeddings_arr = np.asarray(embeddings, dtype=np.float32)
@@ -403,32 +498,38 @@ class Database:
                 except OSError:
                     pass
 
-        self.leann_dirty = False
-        self.leann_engine = None
-        logger.info("LEANN index built: entries=%d path=%s", len(ids), index_path)
+        self._clear_leann_dirty(project)
+        self._cleanup_leann_engine(project)
+        logger.info(
+            "LEANN index built: entries=%d path=%s project=%s",
+            len(ids),
+            index_path,
+            project or "global",
+        )
         return True
 
-    def ensure_leann_engine(self, build_if_missing: bool = False) -> bool:
+    def ensure_leann_engine(self, project: str | None, build_if_missing: bool = False) -> bool:
         if not settings.leann_enabled:
             return False
 
-        if self.leann_engine and not self.leann_dirty:
+        key = self._leann_project_key(project)
+        if key in self.leann_engines and not self.leann_is_dirty(project):
             return True
 
-        if self.leann_engine and self.leann_dirty:
-            self._cleanup_leann_engine()
+        if key in self.leann_engines and self.leann_is_dirty(project):
+            self._cleanup_leann_engine(project)
 
-        index_path = self._leann_index_path()
+        index_path = self._leann_index_path(project)
         meta_path = self._leann_meta_path(index_path)
-        if meta_path.exists() and not self.leann_dirty:
-            return self._load_leann_engine()
+        if meta_path.exists() and not self.leann_is_dirty(project):
+            return self._load_leann_engine(project)
 
         if not build_if_missing:
             return False
 
-        if not self.build_leann_index():
+        if not self.build_leann_index(project):
             return False
-        return self._load_leann_engine()
+        return self._load_leann_engine(project)
 
     def maybe_init_leann(self) -> None:
         if not settings.leann_enabled:
@@ -436,21 +537,18 @@ class Database:
         if settings.leann_lazy_build and not settings.leann_warmup_on_start:
             return
         build_if_missing = not settings.leann_lazy_build
-        self.ensure_leann_engine(build_if_missing=build_if_missing)
+        self.ensure_leann_engine(project=None, build_if_missing=build_if_missing)
 
     def leann_search(self, query: str, limit: int = 5, project: str | None = None) -> list[str]:
-        if not self.ensure_leann_engine(build_if_missing=True):
+        if not self.ensure_leann_engine(project=project, build_if_missing=True):
             return []
-        if self.leann_engine is None:
+        engine = self.leann_engines.get(self._leann_project_key(project))
+        if engine is None:
             return []
-        metadata_filters: dict[str, Any] | None = None
-        if project:
-            metadata_filters = {"project": {"==": project}}
         try:
-            results = self.leann_engine.search(
+            results = engine.search(
                 query,
                 top_k=limit,
-                metadata_filters=metadata_filters,
             )
         except Exception as e:
             logger.warning(f"LEANN search failed: {e}")
@@ -513,6 +611,7 @@ class Database:
             self.upsert_engram_index(memory.id, memory.text)
         except Exception as e:
             logger.warning(f"Engram index update failed: {e}")
+        self._mark_leann_dirty(memory.project)
 
     def get_memory_by_id(self, memory_id: str) -> Memory | None:
         """Retrieve a memory by ID."""
@@ -585,6 +684,7 @@ class Database:
 
     def delete_memory(self, memory_id: str) -> bool:
         """Delete a memory by ID (hard delete)."""
+        project = self._get_project_for_memory(memory_id)
         cursor = self.execute("DELETE FROM memories WHERE id = ?", (memory_id,))
         deleted = cursor.rowcount > 0
         if deleted:
@@ -594,7 +694,7 @@ class Database:
                 logger.warning(f"Engram index cleanup failed: {e}")
         self.commit()
         if deleted:
-            self.leann_dirty = True
+            self._mark_leann_dirty(project)
         return deleted
 
     def update_embedding(self, memory_id: str, embedding: list[float]) -> bool:
@@ -607,11 +707,13 @@ class Database:
         )
         self.commit()
         if cursor.rowcount > 0:
-            self.leann_dirty = True
+            project = self._get_project_for_memory(memory_id)
+            self._mark_leann_dirty(project)
         return cursor.rowcount > 0
 
     def archive_memory(self, memory_id: str) -> bool:
         """Archive a memory by ID (soft delete)."""
+        project = self._get_project_for_memory(memory_id)
         cursor = self.execute(
             "UPDATE memories SET archived = 1 WHERE id = ? AND archived = 0", (memory_id,)
         )
@@ -623,7 +725,7 @@ class Database:
                 logger.warning(f"Engram index cleanup failed: {e}")
         self.commit()
         if archived:
-            self.leann_dirty = True
+            self._mark_leann_dirty(project)
         return archived
 
     def bulk_delete(self, project: str | None = None, before_timestamp: int | None = None) -> int:
@@ -653,7 +755,10 @@ class Database:
             except Exception as e:
                 logger.warning(f"Engram index cleanup failed: {e}")
         if cursor.rowcount > 0:
-            self.leann_dirty = True
+            if project is None:
+                self.mark_all_leann_dirty()
+            else:
+                self._mark_leann_dirty(project)
         return cursor.rowcount
 
     def get_memories_by_ids(

+ 14 - 5
src/main.py

@@ -104,8 +104,6 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
     if settings.leann_enabled and settings.leann_idle_build:
 
         async def idle_leann_builder() -> None:
-            index_path = Path(settings.leann_index_path).expanduser().resolve()
-            meta_path = Path(f"{index_path}.meta.json")
             while True:
                 await asyncio.sleep(settings.leann_idle_check_interval)
                 last_ts = getattr(app.state, "last_request_ts", None)
@@ -117,12 +115,23 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
                     continue
                 if app.state.leann_idle_building:
                     continue
-                if not db.leann_dirty and meta_path.exists():
+
+                project_key = db.next_leann_build_project()
+                if project_key is None:
                     continue
+
+                project_value = None
+                if project_key != db.leann_global_key():
+                    project_value = project_key
+
                 app.state.leann_idle_building = True
                 try:
-                    logger.info("LEANN idle build: starting (idle_for=%.1fs)", idle_for)
-                    built = await asyncio.to_thread(db.build_leann_index)
+                    logger.info(
+                        "LEANN idle build: starting (project=%s idle_for=%.1fs)",
+                        project_value or "global",
+                        idle_for,
+                    )
+                    built = await asyncio.to_thread(db.build_leann_index, project_value)
                     if built:
                         logger.info("LEANN idle build: completed")
                     else:

+ 0 - 3
src/memory.py

@@ -693,9 +693,6 @@ class MemoryService:
 
             offset += page_size
 
-        if reembedded:
-            db.leann_dirty = True
-
         logger.info(f"Re-embed completed: scanned={scanned}, reembedded={reembedded}")
         return {"scanned": scanned, "reembedded": reembedded}