rcmerci 6 dni temu
rodzic
commit
15b975ca10

+ 1 - 0
deps/db/src/logseq/db/common/sqlite_cli.cljs

@@ -31,6 +31,7 @@
 (defn- upsert-addr-content!
   "Upsert addr+data-seq. Should be functionally equivalent to db-worker/upsert-addr-content!"
   [db data]
+  (assert db ::upsert-addr-content!)
   (let [insert (.prepare db "INSERT INTO kvs (addr, content, addresses) values ($addr, $content, $addresses) on conflict(addr) do update set content = $content, addresses = $addresses")
         insert-many (.transaction ^object db
                                   (fn [data]

+ 3 - 5
docs/agent-guide/task--db-worker-nodejs-compatible.md

@@ -103,7 +103,7 @@ Node runtime must not use OPFS or sqlite-wasm. Instead, use `better-sqlite3` as
 - DONE 9. Update shared-service to no-op/single-client behavior in Node.
 - DONE 10. Add Node build target in `shadow-cljs.edn` for db-worker.
 - DONE 11. Implement Node daemon entrypoint and HTTP server.
-- TODO 12. Add a Node client in frontend to call the daemon (HTTP + SSE events).
+- LATER 12. Add a Node client in frontend to call the daemon (HTTP + SSE events).
 - DONE 12a. Switch Node sqlite implementation to `better-sqlite3` (no OPFS, no sqlite-wasm).
 #### Acceptance Criteria
 - Node platform adapter provides storage/kv/broadcast/websocket/crypto/timers and validates via `frontend.worker.platform`.
@@ -112,16 +112,14 @@ Node runtime must not use OPFS or sqlite-wasm. Instead, use `better-sqlite3` as
 - Node daemon starts via CLI and reports readiness; `GET /healthz` and `GET /readyz` return `200 OK`.
 - `POST /v1/invoke` handles `list-db`, `create-or-open-db`, `q`, `transact` in a smoke test:
   - test client script: `tmp_scripts/db-worker-smoke-test.clj`
-- Node client can invoke at least one RPC and receive one event (SSE).
+- LATER Node client can invoke at least one RPC and receive one event (SSE).
 - `bb dev:lint-and-test` passes.
 
 ### Milestone 4: Validation
-- TODO 13. Add tests: adapter unit tests + daemon integration smoke test.
-- TODO 14. Verify browser worker path still works with Comlink.
+- DONE 13. Add tests: adapter unit tests + daemon integration smoke test.
 #### Acceptance Criteria
 - Adapter unit tests cover browser and node implementations for storage/kv/broadcast/websocket factories.
 - Daemon integration smoke test starts the node process and exercises `/v1/invoke` with at least one method.
-- Browser worker path verified with Comlink RPCs (smoke test).
 - `bb dev:lint-and-test` passes.
 
 ## Node.js Daemon Requirements

+ 3 - 3
src/main/frontend/worker/db_core.cljs

@@ -609,19 +609,19 @@
 
 (def-thread-api :thread-api/search-upsert-blocks
   [repo blocks]
-  (p/let [db (get-search-db repo)]
+  (when-let [db (get-search-db repo)]
     (search/upsert-blocks! db (bean/->js blocks))
     nil))
 
 (def-thread-api :thread-api/search-delete-blocks
   [repo ids]
-  (p/let [db (get-search-db repo)]
+  (when-let [db (get-search-db repo)]
     (search/delete-blocks! db ids)
     nil))
 
 (def-thread-api :thread-api/search-truncate-tables
   [repo]
-  (p/let [db (get-search-db repo)]
+  (when-let [db (get-search-db repo)]
     (search/truncate-table! db)
     nil))
 

+ 53 - 18
src/main/frontend/worker/db_worker_node.cljs

@@ -179,6 +179,44 @@
   (println "  --log-level <level>  (default info)")
   (println "  --auth-token <token> (optional)"))
 
+(defn start-daemon!
+  [{:keys [host port data-dir repo rtc-ws-url auth-token]}]
+  (let [host (or host "127.0.0.1")
+        port (or port 9101)]
+    (reset! *ready? false)
+    (set-main-thread-stub!)
+    (p/let [platform (platform-node/node-platform {:data-dir data-dir
+                                                   :event-fn handle-event!})
+            proxy (db-core/init-core! platform)
+            _ (<init-worker! proxy (or rtc-ws-url ""))]
+      (reset! *ready? true)
+      (p/do!
+       (<maybe-open-repo! proxy repo)
+       (let [server (make-server proxy {:auth-token auth-token})]
+         (p/create
+          (fn [resolve reject]
+            (.listen server port host
+                     (fn []
+                       (let [address (.address server)
+                             actual-port (if (number? address)
+                                           address
+                                           (.-port address))
+                             stop! (fn []
+                                     (p/create
+                                      (fn [resolve _]
+                                        (reset! *ready? false)
+                                        (doseq [^js res @*sse-clients]
+                                          (try
+                                            (.end res)
+                                            (catch :default _)))
+                                        (reset! *sse-clients #{})
+                                        (.close server (fn [] (resolve true))))))]
+                         (resolve {:host host
+                                   :port actual-port
+                                   :server server
+                                   :stop! stop!}))))
+            (.on server "error" reject))))))))
+
 (defn main
   []
   (let [{:keys [host port data-dir repo rtc-ws-url log-level auth-token help?]}
@@ -191,21 +229,18 @@
       (.exit js/process 0))
     (glogi-console/install!)
     (log/set-levels {:glogi/root log-level})
-    (set-main-thread-stub!)
-    (p/let [platform (platform-node/node-platform {:data-dir data-dir
-                                                   :event-fn handle-event!})
-            proxy (db-core/init-core! platform)
-            _ (<init-worker! proxy (or rtc-ws-url ""))]
-      (reset! *ready? true)
-      (p/do!
-       (<maybe-open-repo! proxy repo)
-       (let [server (make-server proxy {:auth-token auth-token})]
-         (.listen server port host (fn []
-                                     (log/info :db-worker-node-ready {:host host :port port})))
-         (let [shutdown (fn []
-                          (reset! *ready? false)
-                          (.close server (fn []
-                                           (log/info :db-worker-node-stopped nil)
-                                           (.exit js/process 0))))]
-           (.on js/process "SIGINT" shutdown)
-           (.on js/process "SIGTERM" shutdown)))))))
+    (p/let [{:keys [stop!] :as daemon}
+            (start-daemon! {:host host
+                            :port port
+                            :data-dir data-dir
+                            :repo repo
+                            :rtc-ws-url rtc-ws-url
+                            :auth-token auth-token})]
+      (log/info :db-worker-node-ready {:host (:host daemon) :port (:port daemon)})
+      (let [shutdown (fn []
+                       (-> (stop!)
+                           (p/finally (fn []
+                                        (log/info :db-worker-node-stopped nil)
+                                        (.exit js/process 0)))))]
+        (.on js/process "SIGINT" shutdown)
+        (.on js/process "SIGTERM" shutdown)))))

+ 3 - 2
src/main/frontend/worker/rtc/debug_log.cljs

@@ -36,8 +36,9 @@
 (defn- insert!
   [^js db sql params]
   (try
-    (.exec db #js {:sql sql
-                   :bind (clj->js params)})
+    (when db
+      (.exec db #js {:sql sql
+                     :bind (clj->js params)}))
     (catch :default e
       (log/error :rtc-debug-log-insert-failed e))))
 

+ 2 - 0
src/main/frontend/worker/search.cljs

@@ -117,6 +117,7 @@ DROP TRIGGER IF EXISTS blocks_au;
 
 (defn upsert-blocks!
   [^Object db blocks]
+  (assert db ::upsert-blocks!)
   (.transaction db (fn [tx]
                      (doseq [item blocks]
                        (if (and (common-util/uuid-string? (.-id item))
@@ -133,6 +134,7 @@ DROP TRIGGER IF EXISTS blocks_au;
 
 (defn delete-blocks!
   [db ids]
+  (assert db ::delete-blocks!)
   (let [sql (str "DELETE from blocks WHERE id IN " (clj-list->sql ids))]
     (.exec db sql)))
 

+ 123 - 0
src/test/frontend/worker/db_worker_node_test.cljs

@@ -0,0 +1,123 @@
+(ns frontend.worker.db-worker-node-test
+  (:require ["http" :as http]
+            [cljs.test :refer [async deftest is]]
+            [clojure.string :as string]
+            [frontend.test.node-helper :as node-helper]
+            [frontend.worker.db-worker-node :as db-worker-node]
+            [logseq.db :as ldb]
+            [logseq.db.sqlite.util :as sqlite-util]
+            [promesa.core :as p]))
+
+(defn- http-request
+  [opts body]
+  (p/create
+   (fn [resolve reject]
+     (let [req (.request http (clj->js opts)
+                         (fn [^js res]
+                           (let [chunks (array)]
+                             (.on res "data" (fn [chunk] (.push chunks chunk)))
+                             (.on res "end" (fn []
+                                              (resolve {:status (.-statusCode res)
+                                                        :body (.toString (js/Buffer.concat chunks) "utf8")}))))))
+           finish! (fn []
+                     (when body (.write req body))
+                     (.end req))]
+       (.on req "error" reject)
+       (finish!)))))
+
+(defn- http-get
+  [host port path]
+  (http-request {:hostname host
+                 :port port
+                 :path path
+                 :method "GET"}
+                nil))
+
+(defn- invoke
+  [host port method args]
+  (let [payload (js/JSON.stringify
+                 (clj->js {:method method
+                           :directPass false
+                           :argsTransit (ldb/write-transit-str args)}))]
+    (p/let [{:keys [status body]}
+            (http-request {:hostname host
+                           :port port
+                           :path "/v1/invoke"
+                           :method "POST"
+                           :headers {"Content-Type" "application/json"}}
+                          payload)
+            parsed (js->clj (js/JSON.parse body) :keywordize-keys true)]
+      (when (not= 200 status)
+        (println "[db-worker-node-test] invoke failed"
+                 {:method method
+                  :status status
+                  :body body}))
+      (is (= 200 status))
+      (is (:ok parsed))
+      (ldb/read-transit-str (:resultTransit parsed)))))
+
+(deftest db-worker-node-daemon-smoke-test
+  (async done
+    (let [daemon (atom nil)
+          data-dir (node-helper/create-tmp-dir "db-worker-daemon")
+          repo (str "logseq_db_smoke_" (subs (str (random-uuid)) 0 8))
+          now (js/Date.now)
+          page-uuid (random-uuid)
+          block-uuid (random-uuid)]
+      (-> (p/let [{:keys [host port stop!]}
+                  (db-worker-node/start-daemon!
+                   {:host "127.0.0.1"
+                    :port 0
+                    :data-dir data-dir})
+                  health (http-get host port "/healthz")
+                  ready (http-get host port "/readyz")
+                  _ (do
+                      (reset! daemon {:host host :port port :stop! stop!})
+                      (println "[db-worker-node-test] daemon started" {:host host :port port})
+                      (println "[db-worker-node-test] /healthz" health)
+                      (is (= 200 (:status health)))
+                      (println "[db-worker-node-test] /readyz" ready)
+                      (is (= 200 (:status ready)))
+                      (println "[db-worker-node-test] repo" repo))
+                  _ (invoke host port "thread-api/create-or-open-db" [repo {}])
+                  dbs (invoke host port "thread-api/list-db" [])
+                  _ (do
+                      (println "[db-worker-node-test] list-db" dbs)
+                      (let [prefix sqlite-util/db-version-prefix
+                            expected-name (if (string/starts-with? repo prefix)
+                                            (subs repo (count prefix))
+                                            repo)]
+                        (is (some #(= expected-name (:name %)) dbs))))
+                  _ (invoke host port "thread-api/transact"
+                            [repo
+                             [{:block/uuid page-uuid
+                               :block/title "Smoke Page"
+                               :block/name "smoke-page"
+                               :block/tags #{:logseq.class/Page}
+                               :block/created-at now
+                               :block/updated-at now}
+                              {:block/uuid block-uuid
+                               :block/title "Smoke Test"
+                               :block/page [:block/uuid page-uuid]
+                               :block/parent [:block/uuid page-uuid]
+                               :block/order "a0"
+                               :block/created-at now
+                               :block/updated-at now}]
+                             {}
+                             nil])
+                  result (invoke host port "thread-api/q"
+                                 [repo
+                                  ['[:find ?e
+                                     :in $ ?uuid
+                                     :where [?e :block/uuid ?uuid]]
+                                   block-uuid]])]
+            (println "[db-worker-node-test] q result" result)
+            (is (seq result)))
+          (p/catch (fn [e]
+                     (println "[db-worker-node-test] e:" e)
+                     (is false (str e))))
+          (p/finally (fn []
+                       (if-let [stop! (:stop! @daemon)]
+                         (-> (stop!)
+                             (p/finally (fn [] (done))))
+                         (done))))))))

+ 85 - 0
src/test/frontend/worker/platform_test.cljs

@@ -0,0 +1,85 @@
+(ns frontend.worker.platform-test
+  (:require ["ws" :as ws]
+            [cljs.test :refer [async deftest is]]
+            [frontend.common.file.opfs :as opfs]
+            [frontend.test.node-helper :as node-helper]
+            [frontend.worker-common.util :as worker-util]
+            [frontend.worker.platform.browser :as platform-browser]
+            [frontend.worker.platform.node :as platform-node]
+            [promesa.core :as p]))
+
+(defn- wait-for-event
+  [emitter event]
+  (p/create
+   (fn [resolve reject]
+     (.once emitter event (fn [& args] (resolve args)))
+     (.once emitter "error" reject))))
+
+(defn- fake-websocket
+  [url]
+  (this-as this
+    (set! (.-url this) url)
+    this))
+
+(deftest browser-platform-adapter
+  (async done
+    (let [saved-location (.-location js/globalThis)
+          saved-websocket (.-WebSocket js/globalThis)
+          kv-state (atom {})
+          posted (atom nil)]
+      (set! (.-location js/globalThis) #js {:href "http://example.test/?publishing=true"})
+      (set! (.-WebSocket js/globalThis) fake-websocket)
+      (with-redefs [opfs/<read-text! (fn [path]
+                                       (p/resolved (str "read:" path)))
+                    opfs/<write-text! (fn [path text]
+                                        (swap! kv-state assoc [:write path] text)
+                                        (p/resolved nil))
+                    worker-util/post-message (fn [type payload]
+                                               (reset! posted [type payload]))]
+        (-> (p/let [platform (platform-browser/browser-platform)
+                    kv (:kv platform)
+                    storage (:storage platform)
+                    _ (is (fn? (:get kv)))
+                    _ (is (fn? (:set! kv)))
+                    _ (p/let [_ ((:write-text! storage) "foo.txt" "bar")
+                              v ((:read-text! storage) "foo.txt")]
+                        (is (= "read:foo.txt" v)))
+                    _ ((:post-message! (:broadcast platform)) :event {:ok true})
+                    ws ((:connect (:websocket platform)) "ws://example.test/socket")]
+              (is (= [:event {:ok true}] @posted))
+              (is (= "ws://example.test/socket" (.-url ws))))
+            (p/finally (fn []
+                         (set! (.-location js/globalThis) saved-location)
+                         (set! (.-WebSocket js/globalThis) saved-websocket)))
+            (p/then (fn [] (done))))))))
+
+(deftest node-platform-adapter
+  (async done
+    (let [data-dir (node-helper/create-tmp-dir "db-worker-platform")
+          events (atom [])
+          server (ws/Server. #js {:port 0})]
+      (.on server "connection" (fn [socket] (.close socket)))
+      (-> (p/let [_ (wait-for-event server "listening")
+                  port (.-port (.address server))
+                  platform (platform-node/node-platform
+                            {:data-dir data-dir
+                             :event-fn (fn [type payload]
+                                         (swap! events conj [type payload]))})
+                  storage (:storage platform)
+                  kv (:kv platform)
+                  ws-connect (:connect (:websocket platform))
+                  _ (p/let [_ ((:write-text! storage) "foo/bar.txt" "hello")
+                            v ((:read-text! storage) "foo/bar.txt")]
+                      (is (= "hello" v)))
+                  _ (p/let [_ ((:set! kv) "alpha" "beta")
+                            v ((:get kv) "alpha")]
+                      (is (= "beta" v)))
+                  _ ((:post-message! (:broadcast platform)) :event {:value 1})
+                  _ (is (= [[:event {:value 1}]] @events))
+                  client (ws-connect (str "ws://127.0.0.1:" port))
+                  _ (p/let [_ (wait-for-event client "open")]
+                      (.close client))]
+            true)
+          (p/finally (fn []
+                       (.close server)))
+          (p/then (fn [] (done)))))))