0xReLogic 4 dni temu
rodzic
commit
f4381ec191
8 zmienionych plików z 271 dodań i 22 usunięć
  1. 8 0
      .env.example
  2. 9 0
      README.md
  3. 2 0
      pyproject.toml
  4. 2 0
      requirements.txt
  5. 8 0
      src/config.py
  6. 189 0
      src/database.py
  7. 5 0
      src/main.py
  8. 48 22
      src/memory.py

+ 8 - 0
.env.example

@@ -27,6 +27,14 @@ HYBRID_ALPHA=0.6
 HYBRID_MODE=rerank
 HYBRID_RERANK_TOPK=100
 
+# LEANN vector search (optional)
+LEANN_ENABLED=false
+LEANN_INDEX_PATH=./data/leann/memories.leann
+LEANN_BACKEND=hnsw
+LEANN_LAZY_BUILD=true
+LEANN_RECOMPUTE_ON_SEARCH=true
+LEANN_WARMUP_ON_START=false
+
 # Performance
 MAX_TEXT_LENGTH=10000
 BATCH_SIZE=32

+ 9 - 0
README.md

@@ -18,6 +18,7 @@ Cognio is a Model Context Protocol (MCP) server that provides persistent semanti
 ## Features
 
 - **Semantic Search**: Find memories by meaning using sentence-transformers
+- **LEANN Vector Search (Optional)**: Lazy-built index with on-demand recomputation to reduce startup memory
 - **Multilingual Support**: Search in 100+ languages seamlessly
 - **Persistent Storage**: SQLite-based storage that survives across sessions
 - **Project Organization**: Organize memories by project and tags
@@ -192,6 +193,14 @@ HYBRID_MODE=rerank        # candidate | rerank
 HYBRID_ALPHA=0.6          # 0..1, higher = more semantic
 HYBRID_RERANK_TOPK=100    # rerank candidate pool size
 
+# LEANN vector search (optional)
+LEANN_ENABLED=false
+LEANN_INDEX_PATH=./data/leann/memories.leann
+LEANN_BACKEND=hnsw
+LEANN_LAZY_BUILD=true
+LEANN_RECOMPUTE_ON_SEARCH=true
+LEANN_WARMUP_ON_START=false
+
 # Summarization
 SUMMARIZATION_ENABLED=true
 SUMMARIZATION_METHOD=abstractive   # extractive | abstractive

+ 2 - 0
pyproject.toml

@@ -27,6 +27,8 @@ python-dotenv = "^1.0.0"
 numpy = "^1.26.0"
 httpx = "^0.25.2"
 groq = "^0.4.0"
+leann-core = "^0.3.6"
+leann-backend-hnsw = "^0.3.6"
 
 [tool.poetry.group.dev.dependencies]
 pytest = "^7.4.3"

+ 2 - 0
requirements.txt

@@ -12,6 +12,8 @@ numpy>=1.26.0,<2.0.0
 openai>=1.3.3,<2.0.0
 groq>=0.4.0,<1.0.0
 scikit-learn>=1.3.2,<2.0.0
+leann-core>=0.3.6,<0.4.0
+leann-backend-hnsw>=0.3.6,<0.4.0
 
 # Development dependencies (optional)
 pytest>=7.4.3

+ 8 - 0
src/config.py

@@ -38,6 +38,14 @@ class Settings(BaseSettings):
     engram_min_hits: int = 2
     engram_query_bucket_limit: int = 500
 
+    # LEANN vector search (optional)
+    leann_enabled: bool = False
+    leann_index_path: str = "./data/leann/memories.leann"
+    leann_backend: str = "hnsw"
+    leann_lazy_build: bool = True
+    leann_recompute_on_search: bool = True
+    leann_warmup_on_start: bool = False
+
     # Performance
     max_text_length: int = 10000
     batch_size: int = 32

+ 189 - 0
src/database.py

@@ -28,6 +28,7 @@ class Database:
         self.conn: sqlite3.Connection | None = None
         self.fts_ready: bool = False
         self.leann_engine: Any | None = None
+        self.leann_dirty: bool = False
 
     def connect(self) -> None:
         """Create database connection and initialize schema."""
@@ -235,6 +236,7 @@ class Database:
             tuple(memory_ids),
         )
         self.commit()
+        self.leann_dirty = True
 
     def engram_search_candidates(
         self, query: str, project: str | None = None, limit: int | None = None
@@ -276,8 +278,187 @@ class Database:
             logger.warning(f"Engram search failed: {e}")
             return []
 
+    def _leann_index_path(self) -> Path:
+        return Path(settings.leann_index_path).expanduser().resolve()
+
+    def _leann_meta_path(self, index_path: Path) -> Path:
+        return Path(f"{index_path}.meta.json")
+
+    def _cleanup_leann_engine(self) -> None:
+        engine = self.leann_engine
+        self.leann_engine = None
+        if engine is None:
+            return
+        cleanup = getattr(engine, "cleanup", None)
+        if callable(cleanup):
+            try:
+                cleanup()
+            except Exception as e:
+                logger.warning(f"LEANN cleanup failed: {e}")
+
+    def _load_leann_engine(self) -> bool:
+        if not settings.leann_enabled:
+            return False
+        try:
+            from leann import LeannSearcher
+        except Exception as e:
+            logger.warning(f"LEANN not available: {e}")
+            return False
+
+        index_path = self._leann_index_path()
+        meta_path = self._leann_meta_path(index_path)
+        if not meta_path.exists():
+            logger.info("LEANN index not found: %s", meta_path)
+            return False
+
+        try:
+            self.leann_engine = LeannSearcher(
+                str(index_path),
+                enable_warmup=bool(settings.leann_warmup_on_start),
+                recompute_embeddings=bool(settings.leann_recompute_on_search),
+            )
+            self.leann_dirty = False
+            logger.info("LEANN searcher loaded: %s", index_path)
+            return True
+        except Exception as e:
+            logger.warning(f"LEANN searcher load failed: {e}")
+            self.leann_engine = None
+            return False
+
+    def build_leann_index(self) -> bool:
+        if not settings.leann_enabled:
+            return False
+        if self.conn is None:
+            raise RuntimeError(_DB_NOT_CONNECTED_ERROR)
+
+        try:
+            from leann import LeannBuilder
+        except Exception as e:
+            logger.warning(f"LEANN not available: {e}")
+            return False
+
+        from .embeddings import embedding_service
+
+        embedding_service.load_model()
+        emb_dim = embedding_service.embedding_dim
+
+        backend_kwargs: dict[str, Any] = {}
+        if settings.leann_backend == "hnsw":
+            backend_kwargs["is_recompute"] = bool(settings.leann_recompute_on_search)
+            if not settings.leann_recompute_on_search:
+                backend_kwargs["is_compact"] = False
+
+        builder = LeannBuilder(
+            backend_name=settings.leann_backend,
+            embedding_model=embedding_service.model_name,
+            embedding_mode="sentence-transformers",
+            dimensions=emb_dim,
+            **backend_kwargs,
+        )
+
+        cursor = self.execute(
+            "SELECT id, text, embedding, project FROM memories "
+            "WHERE archived = 0 AND embedding IS NOT NULL"
+        )
+        rows = cursor.fetchall()
+        ids: list[str] = []
+        embeddings: list[list[float]] = []
+        for row in rows:
+            try:
+                emb = json.loads(row["embedding"].decode("utf-8"))
+            except Exception:
+                continue
+            if emb and len(emb) == emb_dim:
+                memory_id = row["id"]
+                ids.append(memory_id)
+                embeddings.append(emb)
+                metadata: dict[str, Any] = {"id": memory_id}
+                if row["project"]:
+                    metadata["project"] = row["project"]
+                builder.add_text(row["text"] or "", metadata=metadata)
+
+        if not ids:
+            logger.info("LEANN build skipped: no embeddings available")
+            return False
+
+        import numpy as np
+        import os
+        import pickle
+        import tempfile
+
+        index_path = self._leann_index_path()
+        index_path.parent.mkdir(parents=True, exist_ok=True)
+
+        embeddings_arr = np.asarray(embeddings, dtype=np.float32)
+        temp_path = None
+        try:
+            with tempfile.NamedTemporaryFile("wb", delete=False) as tmp_file:
+                pickle.dump((ids, embeddings_arr), tmp_file)
+                temp_path = tmp_file.name
+            builder.build_index_from_embeddings(str(index_path), temp_path)
+        finally:
+            if temp_path:
+                try:
+                    os.remove(temp_path)
+                except OSError:
+                    pass
+
+        self.leann_dirty = False
+        self.leann_engine = None
+        logger.info("LEANN index built: entries=%d path=%s", len(ids), index_path)
+        return True
+
+    def ensure_leann_engine(self, build_if_missing: bool = False) -> bool:
+        if not settings.leann_enabled:
+            return False
+
+        if self.leann_engine and not self.leann_dirty:
+            return True
+
+        if self.leann_engine and self.leann_dirty:
+            self._cleanup_leann_engine()
+
+        index_path = self._leann_index_path()
+        meta_path = self._leann_meta_path(index_path)
+        if meta_path.exists() and not self.leann_dirty:
+            return self._load_leann_engine()
+
+        if not build_if_missing:
+            return False
+
+        if not self.build_leann_index():
+            return False
+        return self._load_leann_engine()
+
+    def maybe_init_leann(self) -> None:
+        if not settings.leann_enabled:
+            return
+        if settings.leann_lazy_build and not settings.leann_warmup_on_start:
+            return
+        build_if_missing = not settings.leann_lazy_build
+        self.ensure_leann_engine(build_if_missing=build_if_missing)
+
+    def leann_search(self, query: str, limit: int = 5, project: str | None = None) -> list[str]:
+        if not self.ensure_leann_engine(build_if_missing=True):
+            return []
+        if self.leann_engine is None:
+            return []
+        try:
+            results = self.leann_engine.search(query, top_k=limit)
+        except Exception as e:
+            logger.warning(f"LEANN search failed: {e}")
+            return []
+
+        ids = [result.id for result in results]
+        if not project or not ids:
+            return ids
+
+        allowed = {m.id for m in self.get_memories_by_ids(ids=ids, project=project)}
+        return [mid for mid in ids if mid in allowed]
+
     def close(self) -> None:
         """Close database connection."""
+        self._cleanup_leann_engine()
         if self.conn:
             self.conn.close()
             logger.info("Database connection closed")
@@ -405,6 +586,8 @@ class Database:
             except Exception as e:
                 logger.warning(f"Engram index cleanup failed: {e}")
         self.commit()
+        if deleted:
+            self.leann_dirty = True
         return deleted
 
     def update_embedding(self, memory_id: str, embedding: list[float]) -> bool:
@@ -416,6 +599,8 @@ class Database:
             (embedding_bytes, updated_at, memory_id),
         )
         self.commit()
+        if cursor.rowcount > 0:
+            self.leann_dirty = True
         return cursor.rowcount > 0
 
     def archive_memory(self, memory_id: str) -> bool:
@@ -430,6 +615,8 @@ class Database:
             except Exception as e:
                 logger.warning(f"Engram index cleanup failed: {e}")
         self.commit()
+        if archived:
+            self.leann_dirty = True
         return archived
 
     def bulk_delete(self, project: str | None = None, before_timestamp: int | None = None) -> int:
@@ -458,6 +645,8 @@ class Database:
                 self.delete_engram_for_ids(ids)
             except Exception as e:
                 logger.warning(f"Engram index cleanup failed: {e}")
+        if cursor.rowcount > 0:
+            self.leann_dirty = True
         return cursor.rowcount
 
     def get_memories_by_ids(

+ 5 - 0
src/main.py

@@ -62,6 +62,11 @@ async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]:
         except Exception as e:
             logger.warning("Engram index stats failed: %s", e)
     embedding_service.load_model()
+    if settings.leann_enabled:
+        try:
+            db.maybe_init_leann()
+        except Exception as e:
+            logger.warning("LEANN init failed: %s", e)
 
     # Optionally trigger background re-embedding for mismatched dimensions
     reembed_task = None

+ 48 - 22
src/memory.py

@@ -172,11 +172,11 @@ class MemoryService:
                 ]
 
                 # If LEANN is available, use it for candidate generation
-                leann_engine = getattr(db, "leann_engine", None)
-                if leann_engine and leann_engine.graph.number_of_nodes() > 0:
+                if settings.leann_enabled:
                     leann_ids = db.leann_search(query, limit=100, project=project)
-                    leann_ids_set = set(leann_ids)
-                    mems_with_emb = [m for m in mems_with_emb if m.id in leann_ids_set]
+                    if leann_ids:
+                        leann_ids_set = set(leann_ids)
+                        mems_with_emb = [m for m in mems_with_emb if m.id in leann_ids_set]
 
                 # If Engram is enabled, use it to pre-filter candidates
                 if getattr(settings, "engram_enabled", False):
@@ -279,24 +279,38 @@ class MemoryService:
                 except Exception as e:
                     logger.warning(f"Engram candidate lookup failed: {e}")
 
+            leann_ids: list[str] = []
+            if settings.leann_enabled:
+                try:
+                    leann_ids = db.leann_search(query, limit=100, project=project)
+                except Exception as e:
+                    logger.warning(f"LEANN candidate lookup failed: {e}")
+
             fts_count = len(candidates)
             engram_count = len(engram_candidates)
+            leann_count = len(leann_ids)
 
-            if engram_candidates:
+            if engram_candidates or leann_ids:
                 merged: dict[str, float] = {mid: float(rank) for mid, rank in candidates}
-                for mid, hits in engram_candidates:
-                    rank = 1.0 / max(float(hits), 1.0)
-                    if mid in merged:
-                        merged[mid] = min(merged[mid], rank)
-                    else:
-                        merged[mid] = rank
+                if engram_candidates:
+                    for mid, hits in engram_candidates:
+                        rank = 1.0 / max(float(hits), 1.0)
+                        if mid in merged:
+                            merged[mid] = min(merged[mid], rank)
+                        else:
+                            merged[mid] = rank
+                if leann_ids:
+                    for mid in leann_ids:
+                        if mid not in merged:
+                            merged[mid] = 1e6
                 candidates = sorted(merged.items(), key=lambda x: x[1])
 
             logger.info(
-                "candidate_counts q=%r fts=%d engram=%d merged=%d",
+                "candidate_counts q=%r fts=%d engram=%d leann=%d merged=%d",
                 query,
                 fts_count,
                 engram_count,
+                leann_count,
                 len(candidates),
             )
 
@@ -505,6 +519,13 @@ class MemoryService:
                 before_ts = None
 
         base_memories: list[Memory] | None = None
+        candidate_ids: set[str] = set()
+
+        if settings.leann_enabled:
+            leann_ids = db.leann_search(query, limit=200, project=project)
+            if leann_ids:
+                candidate_ids.update(leann_ids)
+
         if settings.engram_enabled:
             try:
                 engram_candidates = db.engram_search_candidates(
@@ -517,16 +538,18 @@ class MemoryService:
                 engram_candidates = []
 
             if engram_candidates:
-                candidate_ids = [mid for mid, _ in engram_candidates]
-                base_memories = db.get_memories_by_ids(
-                    ids=candidate_ids,
-                    project=project,
-                    tags=tags,
-                    after_timestamp=after_ts,
-                    before_timestamp=before_ts,
-                )
-                if not base_memories:
-                    base_memories = None
+                candidate_ids.update(mid for mid, _ in engram_candidates)
+
+        if candidate_ids:
+            base_memories = db.get_memories_by_ids(
+                ids=list(candidate_ids),
+                project=project,
+                tags=tags,
+                after_timestamp=after_ts,
+                before_timestamp=before_ts,
+            )
+            if not base_memories:
+                base_memories = None
 
         if base_memories is None:
             base_memories = db.get_all_memories(
@@ -670,6 +693,9 @@ class MemoryService:
 
             offset += page_size
 
+        if reembedded:
+            db.leann_dirty = True
+
         logger.info(f"Re-embed completed: scanned={scanned}, reembedded={reembedded}")
         return {"scanned": scanned, "reembedded": reembedded}