Jelajahi Sumber

wip: refed count experiment

Tienson Qin 7 bulan lalu
induk
melakukan
4a05f46436

+ 41 - 1
deps/db/src/logseq/db/common/sqlite.cljs

@@ -2,6 +2,8 @@
   "Provides common sqlite util fns for file and DB graphs. These fns work on
   browser and node"
   (:require ["path" :as node-path]
+            [cljs-bean.core :as bean]
+            [clojure.set :as set]
             [clojure.string :as string]
             [datascript.core :as d]
             [logseq.db.sqlite.util :as sqlite-util]))
@@ -9,7 +11,45 @@
 (defn create-kvs-table!
   "Creates a sqlite table for use with datascript.storage if one doesn't exist"
   [sqlite-db]
-  (.exec sqlite-db "create table if not exists kvs (addr INTEGER primary key, content TEXT, addresses JSON)"))
+  (.exec sqlite-db "create table if not exists kvs (addr INTEGER primary key, content TEXT, addresses JSON, refCount INTEGER NOT NULL DEFAULT 0)"))
+
+(defn set-ref-count-for-addrs!
+  "Migrate existing db to add refCount for addrs"
+  [db]
+  (let [schema (some->> (.exec db #js {:sql "select content from kvs where addr = 0"
+                                       :rowMode "array"})
+                        bean/->clj
+                        ffirst
+                        sqlite-util/transit-read)
+        ;; internal addrs including root, tail and three index
+        ;; those addrs won't be changed
+        internal-addrs #{0 1 (:eavt schema) (:avet schema) (:aevt schema)}
+        result (->> (.exec db #js {:sql "select addr, addresses from kvs"
+                                   :rowMode "array"})
+                    bean/->clj
+                    (map (fn [[addr addresses]]
+                           [addr (bean/->clj (js/JSON.parse addresses))])))
+
+        addrs (set/difference (set (map first result)) internal-addrs)
+        addr-ref-count (->> (mapcat second result)
+                            frequencies)
+        addr-refs-count (map (fn [addr] [addr (get addr-ref-count addr 0)]) addrs)]
+    (.transaction
+     db
+     (fn [tx]
+       ;; internal addrs will not be decremented since they're not children for any node
+       (doseq [addr internal-addrs]
+         (.exec tx #js {:sql "UPDATE kvs SET refCount = 1 WHERE addr = ?"
+                        :bind #js [addr]}))
+
+       (doseq [[addr refs-count] addr-refs-count]
+         (.exec tx #js {:sql "UPDATE kvs SET refCount = $refCount WHERE addr = $addr"
+                        :bind #js {:$refCount refs-count
+                                   :$addr addr}}))))))
+
+(defn add-ref-count-to-kvs-table!
+  [sqlite-db]
+  (.exec sqlite-db "ALTER TABLE kvs ADD COLUMN refCount INTEGER NOT NULL DEFAULT 0"))
 
 (defn get-storage-conn
   "Given a datascript storage, returns a datascript connection for it"

+ 75 - 42
src/main/frontend/worker/db_worker.cljs

@@ -141,6 +141,35 @@
       (rebuild-db-from-datoms! conn sqlite-db)
       (worker-util/post-message :notification ["The graph has been successfully rebuilt." :success false]))))
 
+(defn- validate-addrs-ref-count
+  [^Object db]
+  (let [schema (some->> (.exec db #js {:sql "select content from kvs where addr = 0"
+                                       :rowMode "array"})
+                        bean/->clj
+                        ffirst
+                        sqlite-util/transit-read)
+        result (->> (.exec db #js {:sql "select addr, addresses from kvs"
+                                   :rowMode "array"})
+                    bean/->clj
+                    (map (fn [[addr addresses]]
+                           [addr (bean/->clj (js/JSON.parse addresses))])))
+        used-addresses (set (concat (mapcat second result)
+                                    [0 1 (:eavt schema) (:avet schema) (:aevt schema)]))
+        unused-addresses (clojure.set/difference (set (map first result)) used-addresses)
+        ref-zero-addresses (->> (.exec db #js {:sql "select addr from kvs where refCount = 0"
+                                               :rowMode "array"})
+                                bean/->clj
+                                (map first)
+                                (set))]
+    (prn :debug {;; :diff (clojure.set/difference ref-zero-addresses unused-addresses)
+                   ;; :ref-zero-addresses ref-zero-addresses
+                 :unused-addresses (count unused-addresses)})
+      ;; (when (not= ref-zero-addresses unused-addresses)
+      ;;   (throw (ex-info "DB address ref count failed"
+      ;;                   {:ref-zero-addresses ref-zero-addresses
+      ;;                    :unused-addresses unused-addresses})))
+    ))
+
 (comment
   (defn- gc-kvs-table!
     [^Object db]
@@ -193,28 +222,39 @@
 
 (defn upsert-addr-content!
   "Upsert addr+data-seq. Update sqlite-cli/upsert-addr-content! when making changes"
-  [db data delete-addrs*]
-  (let [_delete-addrs (clojure.set/difference (set delete-addrs*) #{0 1})]
+  [db data delete-addrs *cache-addr->addresses]
+  (let [store-addrs (->>
+                     data
+                     (mapcat
+                      (fn [{:keys [addr addresses]}]
+                        (when addresses
+                          (let [existing-addresses (get @*cache-addr->addresses addr)]
+                            (swap! *cache-addr->addresses update addr (fn [col]
+                                                                        (if col
+                                                                          (apply conj col addresses)
+                                                                          (set addresses))))
+                            (when (seq existing-addresses)
+                              (remove existing-addresses addresses))))))
+                     frequencies)
+        data' (map (fn [{:keys [addr content addresses]}]
+                     #js {:$addr addr
+                          :$content (sqlite-util/transit-write content)
+                          :$addresses (when addresses
+                                        (js/JSON.stringify (bean/->js addresses)))})
+                   data)]
     (assert (some? db) "sqlite db not exists")
     (.transaction db (fn [tx]
-                       (doseq [item data]
+                       (doseq [item data']
                          (.exec tx #js {:sql "INSERT INTO kvs (addr, content, addresses) values ($addr, $content, $addresses) on conflict(addr) do update set content = $content, addresses = $addresses"
-                                        :bind item}))))
-    ;; (when (seq delete-addrs)
-    ;;   (let [result (.exec db #js {:sql get-to-delete-unused-addresses-sql
-    ;;                               :bind (js/JSON.stringify (clj->js delete-addrs))
-    ;;                               :rowMode "array"})
-    ;;         non-refed-addrs (map #(aget % 0) result)]
-    ;;     (when (seq non-refed-addrs)
-    ;;       (.transaction db (fn [tx]
-    ;;                          (doseq [addr non-refed-addrs]
-    ;;                            (.exec tx #js {:sql "Delete from kvs where addr = ?"
-    ;;                                           :bind #js [addr]})))))
-    ;;     (let [missing-addrs (when worker-util/dev?
-    ;;                           (seq (find-missing-addresses nil db {:delete-addrs non-refed-addrs})))]
-    ;;       (when (seq missing-addrs)
-    ;;         (worker-util/post-message :notification [(str "Bug!! Missing addresses: " missing-addrs) :error false])))))
-    ))
+                                        :bind item}))
+                       (doseq [[addr ref-count] store-addrs]
+                         (.exec tx #js {:sql "UPDATE kvs SET refCount = refCount + $refCount WHERE addr = $addr"
+                                        :bind #js {:$addr addr
+                                                   :$refCount ref-count}}))
+                       (doseq [delete-addr delete-addrs]
+                         (.exec tx #js {:sql "DELETE FROM kvs where addr = $addr and refCount = 1"
+                                        :bind #js {:$addr delete-addr}}))))
+    (worker-util/profile "validate-addrs-ref-count" (validate-addrs-ref-count db))))
 
 (defn restore-data-from-addr
   "Update sqlite-cli/restore-data-from-addr when making changes"
@@ -235,29 +275,22 @@
 (defn new-sqlite-storage
   "Update sqlite-cli/new-sqlite-storage when making changes"
   [^Object db]
-  (reify IStorage
-    (-store [_ addr+data-seq delete-addrs]
-      (let [used-addrs (set (mapcat
-                             (fn [[addr data]]
-                               (cons addr
-                                     (when (map? data)
-                                       (:addresses data))))
-                             addr+data-seq))
-            delete-addrs (remove used-addrs delete-addrs)
-            data (map
-                  (fn [[addr data]]
-                    (let [data' (if (map? data) (dissoc data :addresses) data)
-                          addresses (when (map? data)
-                                      (when-let [addresses (:addresses data)]
-                                        (js/JSON.stringify (bean/->js addresses))))]
-                      #js {:$addr addr
-                           :$content (sqlite-util/transit-write data')
-                           :$addresses addresses}))
-                  addr+data-seq)]
-        (upsert-addr-content! db data delete-addrs)))
-
-    (-restore [_ addr]
-      (restore-data-from-addr db addr))))
+  (let [*cache-addr->addresses (atom {})]
+    (reify IStorage
+      (-store [_ addr+data-seq delete-addrs]
+        (let [data (map
+                    (fn [[addr data]]
+                      (let [data' (if (map? data) (dissoc data :addresses) data)]
+                        (cond->
+                         {:addr addr
+                          :content data'}
+                          (and (map? data) (:addresses data))
+                          (assoc :addresses (:addresses data)))))
+                    addr+data-seq)]
+          (upsert-addr-content! db data delete-addrs *cache-addr->addresses)))
+
+      (-restore [_ addr]
+        (restore-data-from-addr db addr)))))
 
 (defn- close-db-aux!
   [repo ^Object db ^Object search ^Object client-ops]

+ 7 - 2
src/main/frontend/worker/debug.cljs

@@ -1,7 +1,12 @@
 (ns frontend.worker.debug
   "For debug usage"
-  (:require [frontend.worker.state :as worker-state]
-            [datascript.core :as d]))
+  (:require [datascript.core :as d]
+            [frontend.worker.state :as worker-state]))
+
+(defn get-sqlite-conn
+  "Get current sqlite conn"
+  []
+  (worker-state/get-sqlite-conn (worker-state/get-current-repo)))
 
 (defn get-conn
   "Get current db conn"