Browse Source

fix: stale embedding check (#12049)

* fix: slow stale embedding check

* fix: mark all blocks for embedding when full embedding

Also, store hnsw index for each batch so that embedding can continue
when aborted by switching graph or closing the app.

* remove embedding/re-embedding-graph-data!

* enhance: mod+c mod+s to rebuild both keyword and semantic search
Tienson Qin 2 months ago
parent
commit
38e049e1c5

+ 0 - 3
deps/db/src/logseq/db/frontend/property.cljs

@@ -543,9 +543,6 @@
      :logseq.property.history/scalar-value {:title "History scalar value"
                                             :schema {:type :any
                                                      :hide? true}}
-     :logseq.property/created-by {:title "Node created by(deprecated)"
-                                  :schema {:type :string
-                                           :hide? true}}
      :logseq.property/created-by-ref {:title "Node created by"
                                       :schema {:type :entity
                                                :hide? true}

+ 26 - 25
deps/db/src/logseq/db/sqlite/export.cljs

@@ -81,6 +81,8 @@
     (or (block-title pvalue)
         (:logseq.property/value pvalue))))
 
+(defonce ignored-properties [:logseq.property/created-by-ref :logseq.property.embedding/hnsw-label-updated-at])
+
 (defn- buildable-properties
   "Originally copied from db-test/readable-properties. Modified so that property values are
    valid sqlite.build EDN"
@@ -104,31 +106,30 @@
                           ent-properties (when (and (not (:block/closed-value-property pvalue)) (seq ent-properties*))
                                            (buildable-properties db' ent-properties* properties-config' options'))]
                       (build-pvalue-entity-default db ent-properties pvalue options'))))))]
-    (let [ignored-properties [:logseq.property/created-by-ref]]
-      (->> (apply dissoc ent-properties ignored-properties)
-           (map (fn [[k v]]
-                  [k
-                   ;; handle user closed value properties. built-ins have idents and shouldn't be handled here
-                   (if (and (not (db-property/logseq-property? k))
-                            (or (:block/closed-value-property v)
-                                (and (set? v) (:block/closed-value-property (first v)))))
-                     (let [find-closed-uuid (fn [val]
-                                              (or (some #(when (= (:value %) (db-property/property-value-content val))
-                                                           (:uuid %))
-                                                        (get-in properties-config [k :build/closed-values]))
-                                                  (throw (ex-info (str "No closed value found for content: " (pr-str (db-property/property-value-content val))) {:properties properties-config}))))]
-                       (if (set? v)
-                         (set (map #(vector :block/uuid (find-closed-uuid %)) v))
-                         [:block/uuid (find-closed-uuid v)]))
-                     (cond
-                       (de/entity? v)
-                       (build-pvalue-entity db (d/entity db k) v properties-config options)
-                       (and (set? v) (every? de/entity? v))
-                       (let [property-ent (d/entity db k)]
-                         (set (map #(build-pvalue-entity db property-ent % properties-config options) v)))
-                       :else
-                       v))]))
-           (into {})))))
+    (->> (apply dissoc ent-properties ignored-properties)
+         (map (fn [[k v]]
+                [k
+                 ;; handle user closed value properties. built-ins have idents and shouldn't be handled here
+                 (if (and (not (db-property/logseq-property? k))
+                          (or (:block/closed-value-property v)
+                              (and (set? v) (:block/closed-value-property (first v)))))
+                   (let [find-closed-uuid (fn [val]
+                                            (or (some #(when (= (:value %) (db-property/property-value-content val))
+                                                         (:uuid %))
+                                                      (get-in properties-config [k :build/closed-values]))
+                                                (throw (ex-info (str "No closed value found for content: " (pr-str (db-property/property-value-content val))) {:properties properties-config}))))]
+                     (if (set? v)
+                       (set (map #(vector :block/uuid (find-closed-uuid %)) v))
+                       [:block/uuid (find-closed-uuid v)]))
+                   (cond
+                     (de/entity? v)
+                     (build-pvalue-entity db (d/entity db k) v properties-config options)
+                     (and (set? v) (every? de/entity? v))
+                     (let [property-ent (d/entity db k)]
+                       (set (map #(build-pvalue-entity db property-ent % properties-config options) v)))
+                     :else
+                     v))]))
+         (into {}))))
 
 (defn- build-export-properties
   "The caller of this fn is responsible for building :build/:property-classes unless shallow-copy?"

+ 2 - 1
deps/db/src/logseq/db/sqlite/util.cljs

@@ -133,4 +133,5 @@
            (fn [db-ident] [:db/retractEntity db-ident])
            [:logseq.kv/graph-uuid
             :logseq.kv/graph-local-tx
-            :logseq.kv/remote-schema-version])))
+            :logseq.kv/remote-schema-version
+            :logseq.kv/graph-text-embedding-model-name])))

+ 1 - 5
src/dev-cljs/shadow/user.clj

@@ -19,11 +19,7 @@
 ;; Get the runtime id from http://localhost:9630/runtimes, pick the one which shows `browser-worker`
 (defn worker-repl
   ([]
-   (when-let [runtime-id (->> (api/repl-runtimes :workers)
-                              (filter (fn [runtime] (= :browser-worker (:host runtime))))
-                              (map :client-id)
-                              (apply max))]
-     (worker-repl runtime-id)))
+   (worker-repl :old))
   ([runtime-id-or-which]
    (assert runtime-id-or-which "runtime-id shouldn't be empty")
    (if

+ 1 - 1
src/main/frontend/components/settings.cljs

@@ -1252,7 +1252,7 @@
                                       (c.m/<?
                                        (state/<invoke-db-worker :thread-api/vec-search-cancel-indexing repo))
                                       (c.m/<?
-                                       (state/<invoke-db-worker :thread-api/vec-search-re-embedding-graph-data repo)))
+                                       (state/<invoke-db-worker :thread-api/vec-search-embedding-graph repo {:reset-embedding? true})))
                                     :succ (constantly nil)))}
                current-model
                (assoc :value current-model))

+ 4 - 4
src/main/frontend/components/vector_search/sidebar.cljs

@@ -76,14 +76,14 @@
        {:size :sm
         :class "mx-2"
         :on-click (fn [_]
-                    (state/<invoke-db-worker :thread-api/vec-search-embedding-stale-blocks repo))}
-       "embedding-stale-blocks")
+                    (state/<invoke-db-worker :thread-api/vec-search-embedding-graph repo {}))}
+       "embedding-blocks")
       (shui/button
        {:size :sm
         :class "mx-2"
         :on-click (fn [_]
-                    (state/<invoke-db-worker :thread-api/vec-search-re-embedding-graph-data repo))}
-       "force-embedding-all-graph-blocks")
+                    (state/<invoke-db-worker :thread-api/vec-search-embedding-graph repo {:reset-embedding? true}))}
+       "full-embedding-blocks")
       (when (get-in vec-search-state [:repo->index-info repo :indexing?])
         (shui/button
          {:size :sm

+ 4 - 3
src/main/frontend/handler/db_based/import.cljs

@@ -26,7 +26,7 @@
       (repo-handler/restore-and-setup-repo! graph {:import-type :sqlite-db})
       (state/set-current-repo! graph)
       (persist-db/<export-db graph {})
-      (db/transact! graph (sqlite-util/import-tx :sqlite-db))
+      (db/transact! graph (sqlite-util/import-tx :sqlite-db) {:import-db? true})
       (finished-ok-handler))
      (p/catch
       (fn [e]
@@ -44,7 +44,7 @@
                              :datoms datoms})
      (state/add-repo! {:url graph})
      (repo-handler/restore-and-setup-repo! graph {:import-type :debug-transit})
-     (db/transact! graph (sqlite-util/import-tx :debug-transit))
+     (db/transact! graph (sqlite-util/import-tx :debug-transit) {:import-db? true})
      (state/set-current-repo! graph)
      (finished-ok-handler))))
 
@@ -61,7 +61,8 @@
     ;; (cljs.pprint/pprint _txs)
     (if error
       (notification/show! error :error)
-      (let [tx-meta {::sqlite-export/imported-data? true}
+      (let [tx-meta {::sqlite-export/imported-data? true
+                     :import-db? true}
             repo (state/get-current-repo)]
         (p/do
           (db/transact! repo init-tx tx-meta)

+ 1 - 1
src/main/frontend/handler/db_based/vector_search_background_tasks.cljs

@@ -25,6 +25,6 @@
         (m/? (m/sleep 1000))
         (c.m/<? (state/<invoke-db-worker :thread-api/vec-search-init-embedding-model repo))
         (m/?< (c.m/clock (* 30 1000)))
-        (c.m/<? (state/<invoke-db-worker :thread-api/vec-search-embedding-graph repo))
+        (c.m/<? (state/<invoke-db-worker :thread-api/vec-search-embedding-graph repo {}))
         (catch Cancelled _
           (m/amb)))))))

+ 19 - 5
src/main/frontend/handler/search.cljs

@@ -3,6 +3,7 @@
   (:require [clojure.string :as string]
             [dommy.core :as dom]
             [electron.ipc :as ipc]
+            [frontend.common.missionary :as c.m]
             [frontend.common.search-fuzzy :as fuzzy]
             [frontend.config :as config]
             [frontend.db :as db]
@@ -11,7 +12,9 @@
             [frontend.state :as state]
             [frontend.storage :as storage]
             [frontend.util :as util]
+            [logseq.db :as ldb]
             [logseq.graph-parser.text :as text]
+            [missionary.core :as m]
             [promesa.core :as p]))
 
 (defn sanity-search-content
@@ -115,11 +118,22 @@
    (rebuild-indices! false))
   ([notice?]
    (println "Starting to rebuild search indices!")
-   (p/let [_ (search/rebuild-indices!)]
-     (when notice?
-       (notification/show!
-        "Search indices rebuilt successfully!"
-        :success)))))
+   (when-let [repo (state/get-current-repo)]
+     (p/do!
+      (search/rebuild-indices!)
+      (when (ldb/get-key-value (db/get-db) :logseq.kv/graph-text-embedding-model-name)
+        (c.m/run-task
+          ::rebuild-embeddings
+          (m/sp
+            (c.m/<?
+             (state/<invoke-db-worker :thread-api/vec-search-cancel-indexing repo))
+            (c.m/<?
+             (state/<invoke-db-worker :thread-api/vec-search-embedding-graph repo {:reset-embedding? true})))
+          :succ (constantly nil)))
+      (when notice?
+        (notification/show!
+         "Search indices rebuilt successfully!"
+         :success))))))
 
 (defn highlight-exact-query
   [content q]

+ 25 - 1
src/main/frontend/worker/db_listener.cljs

@@ -1,12 +1,14 @@
 (ns frontend.worker.db-listener
   "Db listeners for worker-db."
-  (:require [datascript.core :as d]
+  (:require [clojure.string :as string]
+            [datascript.core :as d]
             [frontend.common.thread-api :as thread-api]
             [frontend.worker.pipeline :as worker-pipeline]
             [frontend.worker.search :as search]
             [frontend.worker.shared-service :as shared-service]
             [frontend.worker.state :as worker-state]
             [logseq.common.util :as common-util]
+            [logseq.db :as ldb]
             [logseq.outliner.batch-tx :as batch-tx]
             [promesa.core :as p]))
 
@@ -31,6 +33,7 @@
 
       (when-not from-disk?
         (p/do!
+         ;; Sync SQLite search
          (let [{:keys [blocks-to-remove-set blocks-to-add]} (search/sync-search-indice repo tx-report')]
            (when (seq blocks-to-remove-set)
              ((@thread-api/*thread-apis :thread-api/search-delete-blocks) repo blocks-to-remove-set))
@@ -45,6 +48,25 @@
     (prn :tx-data tx-data)
     (prn :tx-meta tx-meta)))
 
+(defn- remove-old-embeddings-and-reset-new-updates!
+  [conn tx-data tx-meta]
+  (when (ldb/db-based-graph? @conn)
+    (let [;; Remove old :logseq.property.embedding/hnsw-label-updated-at when importing a graph
+          remove-old-hnsw-tx-data (when (:import-db? tx-meta)
+                                    (->> (d/datoms @conn :avet :logseq.property.embedding/hnsw-label-updated-at)
+                                         (map (fn [d]
+                                                [:db/retract (:e d) :logseq.property.embedding/hnsw-label-updated-at]))))
+          ;; Mark vector embedding
+          mark-embedding-tx-data (->> (keep (fn [datom] (when (and (= :block/title (:a datom)) (:added datom) (not (string/blank? (:v datom))))
+                                                          (:e datom))) tx-data)
+                                      ;; Mark block embedding to be computed
+                                      (map (fn [id] [:db/add id :logseq.property.embedding/hnsw-label-updated-at 0])))
+          tx-data (concat remove-old-hnsw-tx-data mark-embedding-tx-data)]
+      (when (seq tx-data)
+        (d/transact! conn tx-data
+                     {:skip-refresh? true
+                      :pipeline-replace? true})))))
+
 (defn listen-db-changes!
   [repo conn & {:keys [handler-keys]}]
   (let [handlers (if (seq handler-keys)
@@ -69,6 +91,8 @@
       (d/listen! conn ::listen-db-changes!
                  (fn listen-db-changes!-inner
                    [{:keys [tx-data _db-before _db-after tx-meta] :as tx-report}]
+                   (remove-old-embeddings-and-reset-new-updates! conn tx-data tx-meta)
+
                    (let [tx-meta (merge (batch-tx/get-batch-opts) tx-meta)
                          pipeline-replace? (:pipeline-replace? tx-meta)
                          in-batch-tx-mode? (:batch-tx/batch-tx-mode? tx-meta)]

+ 2 - 10
src/main/frontend/worker/db_worker.cljs

@@ -741,17 +741,9 @@
   [repo model-name]
   (js/Promise. (embedding/task--load-model repo model-name)))
 
-(def-thread-api :thread-api/vec-search-embedding-stale-blocks
-  [repo]
-  (embedding/embedding-stale-blocks! repo))
-
-(def-thread-api :thread-api/vec-search-re-embedding-graph-data
-  [repo]
-  (embedding/re-embedding-graph-data! repo))
-
 (def-thread-api :thread-api/vec-search-embedding-graph
-  [repo]
-  (embedding/embedding-graph! repo))
+  [repo opts]
+  (embedding/embedding-graph! repo opts))
 
 (def-thread-api :thread-api/vec-search-search
   [repo query-string nums-neighbors]

+ 32 - 52
src/main/frontend/worker/embedding.cljs

@@ -80,13 +80,16 @@
 
 (defn- stale-block-lazy-seq
   [db reset?]
-  (->> (rseq (d/index-range db :block/updated-at nil nil))
-       (sequence
-        (comp (map #(d/entity db (:e %)))
-              (filter (stale-block-filter-preds reset?))
-              (map (fn [b]
-                     (assoc b :block.temp/text-to-embedding
-                            (db-content/recur-replace-uuid-in-block-title b)
+  (let [datoms (if reset?
+                 (rseq (d/index-range db :block/updated-at nil nil))
+                 (d/datoms db :avet :logseq.property.embedding/hnsw-label-updated-at 0))]
+    (->> datoms
+         (sequence
+          (comp (map #(d/entity db (:e %)))
+                (filter (stale-block-filter-preds reset?))
+                (map (fn [b]
+                       (assoc b :block.temp/text-to-embedding
+                              (db-content/recur-replace-uuid-in-block-title b)
                             ;; FIXME: tags and properties can affect sorting
                             ;; (str (db-content/recur-replace-uuid-in-block-title b)
                             ;;      (let [tags (->> (:block/tags b)
@@ -95,7 +98,7 @@
                             ;;          (str " " (string/join ", " (map (fn [t] (str "#" t)) tags)))))
                             ;;      (when-let [desc (:block/title (:logseq.property/description b))]
                             ;;        (str "\nDescription: " desc)))
-                            )))))))
+                              ))))))))
 (defn- partition-by-text-size
   [text-size]
   (let [*current-size (volatile! 0)
@@ -155,13 +158,15 @@
 (defn- task--embedding-stale-blocks!
   "embedding outdated block-data
   outdate rule: block/updated-at > :logseq.property.embedding/hnsw-label-updated-at"
-  [repo]
+  [repo reset-embedding?]
   (m/sp
     (when-let [^js infer-worker @worker-state/*infer-worker]
       (when-let [conn (worker-state/get-datascript-conn repo)]
         (let [stale-blocks (stale-block-lazy-seq @conn false)]
           (when (seq stale-blocks)
             (m/? (task--update-index-info!* repo infer-worker true))
+            (when reset-embedding?
+              (c.m/<? (.force-reset-index! infer-worker repo)))
             (doseq [stale-block-chunk (sequence (partition-by-text-size (get-partition-size repo)) stale-blocks)]
               (let [e+updated-at-coll (map (juxt :db/id :block/updated-at) stale-block-chunk)
                     _ (when (some (fn [id] (> id 2147483647)) (map :db/id stale-block-chunk))
@@ -175,61 +180,36 @@
                         false))
                     tx-data (labels-update-tx-data @conn e+updated-at-coll)]
                 (d/transact! conn tx-data {:skip-refresh? true})
-                (m/? (task--update-index-info!* repo infer-worker true))))
-            (c.m/<? (.write-index! infer-worker repo))
+                (m/? (task--update-index-info!* repo infer-worker true))
+                (c.m/<? (.write-index! infer-worker repo))))
             (m/? (task--update-index-info!* repo infer-worker false))))))))
 
-(defn- task--re-embedding-graph-data!
-  "force re-embedding all block-data in graph"
-  [repo]
-  (m/sp
-    (when-let [^js infer-worker @worker-state/*infer-worker]
-      (when-let [conn (worker-state/get-datascript-conn repo)]
-        (m/? (task--update-index-info!* repo infer-worker true))
-        (c.m/<? (.force-reset-index! infer-worker repo))
-        (let [all-blocks (stale-block-lazy-seq @conn true)]
-          (doseq [block-chunk (sequence (partition-by-text-size (get-partition-size repo)) all-blocks)]
-            (let [e+updated-at-coll (map (juxt :db/id :block/updated-at) block-chunk)
-                  _ (when (some (fn [id] (> id 2147483647)) (map :db/id block-chunk))
-                      (throw (ex-info "Wrong db/id" {:data (filter (fn [item] (> (:db/id item) 2147483647)) block-chunk)})))
-                  _ (c.m/<?
-                     (.text-embedding+store!
-                      infer-worker repo
-                      (into-array (map :block.temp/text-to-embedding block-chunk))
-                      (into-array (map :db/id block-chunk))
-                      false))
-                  tx-data (labels-update-tx-data @conn e+updated-at-coll)]
-              (d/transact! conn tx-data {:skip-refresh? true})
-              (m/? (task--update-index-info!* repo infer-worker true)))))
-        (c.m/<? (.write-index! infer-worker repo))
-        (m/? (task--update-index-info!* repo infer-worker false))))))
-
-(defn embedding-stale-blocks!
-  [repo]
+(defn- embedding-stale-blocks!
+  [repo reset-embedding?]
   (when-not (indexing? repo)
     (let [canceler (c.m/run-task
                      :embedding-stale-blocks!
-                     (task--embedding-stale-blocks! repo)
-                     :succ (constantly nil))]
-      (reset-*vector-search-state! repo :canceler canceler))))
-
-(defn re-embedding-graph-data!
-  [repo]
-  (when-not (indexing? repo)
-    (let [canceler (c.m/run-task
-                     :re-embedding-graph-data!
-                     (task--re-embedding-graph-data! repo)
+                     (task--embedding-stale-blocks! repo reset-embedding?)
                      :succ (constantly nil))]
       (reset-*vector-search-state! repo :canceler canceler))))
 
 (defn embedding-graph!
-  [repo]
+  [repo {:keys [reset-embedding?]
+         :or {reset-embedding? false}}]
   (when-not (indexing? repo)
     (when-let [conn (worker-state/get-datascript-conn repo)]
       (when (ldb/get-key-value @conn :logseq.kv/graph-text-embedding-model-name)
-        (if (first (d/datoms @conn :avet :logseq.property.embedding/hnsw-label-updated-at)) ; embedding exists
-          (embedding-stale-blocks! repo)
-          (re-embedding-graph-data! repo))))))
+        (when (or reset-embedding?
+                  ;; embedding not exists yet
+                  (empty? (d/datoms @conn :avet :logseq.property.embedding/hnsw-label-updated-at)))
+          ;; reset embedding
+          (let [mark-embedding-tx-data (->>
+                                        (d/datoms @conn :avet :block/title)
+                                        (map (fn [d]
+                                               [:db/add (:e d) :logseq.property.embedding/hnsw-label-updated-at 0])))]
+            (d/transact! conn mark-embedding-tx-data {:skip-refresh? true})))
+
+        (embedding-stale-blocks! repo reset-embedding?)))))
 
 (defn task--embedding-model-info
   [repo]