(ns frontend.db-worker "Worker used for browser DB implementation" (:require [promesa.core :as p] [datascript.storage :refer [IStorage]] [clojure.edn :as edn] [datascript.core :as d] [logseq.db.sqlite.common-db :as sqlite-common-db] [shadow.cljs.modern :refer [defclass]] [datascript.transit :as dt] ["@logseq/sqlite-wasm" :default sqlite3InitModule] ["comlink" :as Comlink] [clojure.string :as string] [cljs-bean.core :as bean] [frontend.worker.search :as search] [logseq.db.sqlite.util :as sqlite-util] [frontend.worker.state :as state] [frontend.worker.file :as file] [logseq.db :as ldb] [frontend.worker.rtc.op-mem-layer :as op-mem-layer] [frontend.worker.rtc.db-listener :as rtc-db-listener] [frontend.worker.rtc.full-upload-download-graph :as rtc-updown] [frontend.worker.rtc.core :as rtc-core] [clojure.core.async :as async] [frontend.worker.async-util :include-macros true :refer [js {:url sqlite-wasm-url :print js/console.log :printErr js/console.error}))] (reset! *sqlite sqlite) nil))) (def repo-path "/db.sqlite") (defn- (.exec db #js {:sql "select content from kvs where addr = ?" :bind #js [addr] :rowMode "array"}) ffirst)] (edn/read-string content)))) (defn new-sqlite-storage [repo _opts] (reify IStorage (-store [_ addr+data-seq delete-addrs] (prn :debug (str "SQLite store addr+data count: " (count addr+data-seq))) (let [data (map (fn [[addr data]] #js {:$addr addr :$content (pr-str data)}) addr+data-seq)] ;; async write so that UI can be refreshed earlier (async/go (upsert-addr-content! repo data delete-addrs)))) (-restore [_ addr] (restore-data-from-addr repo addr)))) (defn- close-db-aux! [repo ^Object db ^Object search] (swap! *sqlite-conns dissoc repo) (swap! *datascript-conns dissoc repo) (when db (.close db)) (when search (.close search)) (when-let [^js pool (state/get-opfs-pool repo)] (.releaseAccessHandles pool)) (swap! *opfs-pools dissoc repo)) (defn- close-other-dbs! [repo] (doseq [[r {:keys [db search]}] @*sqlite-conns] (when-not (= repo r) (close-db-aux! r db search)))) (defn- close-db! [repo] (let [{:keys [db search]} (@*sqlite-conns repo)] (close-db-aux! repo db search))) (defn- create-or-open-db! [repo] (when-not (state/get-sqlite-conn repo) (p/let [^js pool (vec [iter] (when iter (p/loop [acc []] (p/let [elem (.next iter)] (if (.-done elem) acc (p/recur (conj acc (.-value elem)))))))) (defn- vec values-iter)) current-dir-dirs (filter dir? values) result (concat result values) dirs (concat current-dir-dirs (rest dirs))] (p/recur result dirs))))))) (defn- (p/let [^js root (.getDirectory js/navigator.storage) _dir-handle (.getDirectoryHandle root (str "." (get-pool-name graph)))] true) (p/catch (fn [_e] ; not found false)))) (defn- remove-vfs! [^js pool] (when pool (.removeVfs ^js pool))) (defn- get-search-db [repo] (state/get-sqlite-conn repo {:search? true})) #_:clj-kondo/ignore (defclass DBWorker (extends js/Object) (constructor [this] (super)) Object (getVersion [_this] (when-let [sqlite @*sqlite] (.-version sqlite))) (init [_this rtc-ws-url] (reset! state/*rtc-ws-url rtc-ws-url) (init-sqlite-module!)) (listDB [_this] (p/let [all-files (> (keep (fn [file] (when (and (= (.-kind file) "directory") (string/starts-with? (.-name file) ".logseq-pool-")) (-> (.-name file) (string/replace-first ".logseq-pool-" "") ;; TODO: DRY (string/replace "+3A+" ":") (string/replace "++" "/")))) all-files) distinct)] ;; (prn :debug :all-files (map #(.-name %) all-files)) ;; (prn :debug :all-files-count (count (filter ;; #(= (.-kind %) "file") ;; all-files))) ;; (prn :dbs dbs) (bean/->js dbs))) (createOrOpenDB [_this repo] (p/let [_ (close-other-dbs! repo)] (create-or-open-db! repo))) (getMaxTx [_this repo] (when-let [conn (state/get-datascript-conn repo)] (:max-tx @conn))) (q [_this repo inputs-str] "Datascript q" (when-let [conn (state/get-datascript-conn repo)] (let [inputs (edn/read-string inputs-str)] (let [result (apply d/q (first inputs) @conn (rest inputs))] (bean/->js result))))) (transact [_this repo tx-data tx-meta context] (when repo (state/set-db-latest-tx-time! repo)) (when-let [conn (state/get-datascript-conn repo)] (try (let [tx-data (if (string? tx-data) (edn/read-string tx-data) tx-data) tx-meta (if (string? tx-meta) (edn/read-string tx-meta) tx-meta) context (if (string? context) (edn/read-string context) context) _ (when context (state/set-context! context)) tx-meta' (if (:new-graph? tx-meta) tx-meta (-> tx-meta (assoc :skip-store? true) ; delay writes to the disk (dissoc :insert-blocks?)))] (when-not (and (:create-today-journal? tx-meta) (:today-journal-name tx-meta) (seq tx-data) (d/entity @conn [:block/name (:today-journal-name tx-meta)])) ; today journal created already ;; (prn :debug :transact :tx-data tx-data :tx-meta tx-meta') (worker-util/profile "Worker db transact" (ldb/transact! conn tx-data tx-meta'))) nil) (catch :default e (prn :debug :error) (js/console.error e))))) (getInitialData [_this repo] (when-let [conn (state/get-datascript-conn repo)] (->> (sqlite-common-db/get-initial-data @conn) dt/write-transit-str))) (unsafeUnlinkDB [_this repo] (p/let [pool (clj option))] (bean/->js result))) (search-upsert-blocks [this repo blocks] (p/let [db (get-search-db repo)] (search/upsert-blocks! db blocks) nil)) (search-delete-blocks [this repo ids] (p/let [db (get-search-db repo)] (search/delete-blocks! db ids) nil)) (search-truncate-tables [this repo] (p/let [db (get-search-db repo)] (search/truncate-table! db) nil)) (search-build-blocks-indice [this repo] (when-let [conn (state/get-datascript-conn repo)] (search/build-blocks-indice repo @conn))) (search-build-pages-indice [this repo] (when-let [conn (state/get-datascript-conn repo)] (search/build-blocks-indice repo @conn))) (page-search [this repo q limit] (when-let [conn (state/get-datascript-conn repo)] (search/page-search repo @conn q limit))) (page-rename [this repo old-name new-name] (when-let [conn (state/get-datascript-conn repo)] (let [config (state/get-config repo) result (worker-page-rename/rename! repo conn config old-name new-name)] (bean/->js {:result result})))) (file-writes-finished? [this] (empty? @file/*writes)) (page-file-saved [this request-id page-id] (file/dissoc-request! request-id) nil) (sync-app-state [this new-state-str] (let [new-state (edn/read-string new-state-str)] (state/set-new-state! new-state) nil)) ;; RTC (rtc-start [this repo token] (when-let [conn (state/get-datascript-conn repo)] (rtc-core/js (rtc-core/get-debug-state repo))) (dangerousRemoveAllDbs [this repo] (p/let [dbs (.listDB this)] (p/all (map #(.unsafeUnlinkDB this %) dbs))))) (defn init "web worker entry" [] (let [^js obj (DBWorker.)] (state/set-worker-object! obj) (file/