فهرست منبع

feat(db-sync): add schemas for all client&server ws&http requests&responses

rcmerci 1 هفته پیش
والد
کامیت
fcc24fff20

+ 1 - 1
AGENTS.md

@@ -31,6 +31,6 @@
 - PRs should describe the behavior change, link relevant issues, and note any test coverage added or skipped.
 
 ## Agent-Specific Notes
-- Review notes live in `@prompts/review.md`; check them when preparing changes.
+- Review notes live in `prompts/review.md`; check them when preparing changes.
 - DB-sync feature guide for AI agents: `docs/agent-guide/db-sync/db-sync-guide.md`.
 - DB-sync protocol reference: `docs/agent-guide/db-sync/protocol.md`.

+ 1 - 0
deps.edn

@@ -37,6 +37,7 @@
   logseq/common                         {:local/root "deps/common"}
   logseq/graph-parser                   {:local/root "deps/graph-parser"}
   logseq/outliner                       {:local/root "deps/outliner"}
+  logseq/db-sync                        {:local/root "deps/db-sync"}
   logseq/publishing                     {:local/root "deps/publishing"}
   logseq/cli                            {:local/root "deps/cli"}
   logseq/shui                           {:local/root "deps/shui"}

+ 184 - 0
deps/db-sync/src/logseq/db_sync/malli_schema.cljs

@@ -0,0 +1,184 @@
+(ns logseq.db-sync.malli-schema
+  (:require [malli.core :as ma]
+            [malli.transform :as mt]))
+
+(def tx-log-entry-schema
+  [:map
+   [:t :int]
+   [:tx :string]])
+
+(def ws-client-message-schema
+  [:multi {:dispatch :type}
+   ["hello"
+    [:map
+     [:type [:= "hello"]]
+     [:client :string]]]
+   ["pull"
+    [:map
+     [:type [:= "pull"]]
+     [:since {:optional true} :int]]]
+   ["tx/batch"
+    [:map
+     [:type [:= "tx/batch"]]
+     [:t_before :int]
+     [:txs [:sequential :string]]]]
+   ["ping"
+    [:map
+     [:type [:= "ping"]]]]])
+
+(def tx-reject-reason-schema
+  [:enum "stale" "cycle" "empty tx data" "invalid tx" "invalid t_before"])
+
+(def tx-reject-schema
+  [:map
+   [:type [:= "tx/reject"]]
+   [:reason tx-reject-reason-schema]
+   [:t {:optional true} :int]
+   [:data {:optional true} :string]])
+
+(def pull-ok-schema
+  [:map
+   [:type [:= "pull/ok"]]
+   [:t :int]
+   [:txs [:sequential tx-log-entry-schema]]])
+
+(def tx-batch-ok-schema
+  [:map
+   [:type [:= "tx/batch/ok"]]
+   [:t :int]])
+
+(def ws-server-message-schema
+  [:multi {:dispatch :type}
+   ["hello"
+    [:map
+     [:type [:= "hello"]]
+     [:t :int]]]
+   ["pull/ok" pull-ok-schema]
+   ["tx/batch/ok" tx-batch-ok-schema]
+   ["changed"
+    [:map
+     [:type [:= "changed"]]
+     [:t :int]]]
+   ["tx/reject" tx-reject-schema]
+   ["pong"
+    [:map
+     [:type [:= "pong"]]]]
+   ["error"
+    [:map
+     [:type [:= "error"]]
+     [:message :string]]]])
+
+(def http-error-response-schema
+  [:map
+   [:error :string]])
+
+(def http-ok-response-schema
+  [:map
+   [:ok :boolean]])
+
+(def graph-info-schema
+  [:map
+   [:graph_id :string]
+   [:graph_name :string]
+   [:schema_version {:optional true} [:maybe :string]]
+   [:created_at :int]
+   [:updated_at :int]])
+
+(def graphs-list-response-schema
+  [:map
+   [:graphs [:sequential graph-info-schema]]])
+
+(def graph-create-request-schema
+  [:map
+   [:graph_name :string]
+   [:schema_version {:optional true} [:maybe :string]]])
+
+(def graph-create-response-schema
+  [:map
+   [:graph_id :string]])
+
+(def graph-access-response-schema http-ok-response-schema)
+
+(def graph-delete-response-schema
+  [:map
+   [:graph_id :string]
+   [:deleted :boolean]])
+
+(def tx-batch-request-schema
+  [:map
+   [:t_before :int]
+   [:txs [:sequential :string]]])
+
+(def snapshot-row-schema
+  [:or
+   [:tuple :int :string [:maybe :any]]
+   [:map
+    [:addr :int]
+    [:content :string]
+    [:addresses {:optional true} :any]]])
+
+(def snapshot-rows-response-schema
+  [:map
+   [:rows [:sequential snapshot-row-schema]]
+   [:last_addr :int]
+   [:done :boolean]])
+
+(def snapshot-import-request-schema
+  [:map
+   [:reset {:optional true} :boolean]
+   [:rows [:sequential [:tuple :int :string [:maybe :any]]]]])
+
+(def snapshot-import-response-schema
+  [:map
+   [:ok :boolean]
+   [:count :int]])
+
+(def asset-get-response-schema
+  [:or
+   :any
+   http-error-response-schema])
+
+(def http-request-schemas
+  {:graphs/create graph-create-request-schema
+   :sync/tx-batch tx-batch-request-schema
+   :sync/snapshot-import snapshot-import-request-schema})
+
+(def http-response-schemas
+  {:graphs/list graphs-list-response-schema
+   :graphs/create graph-create-response-schema
+   :graphs/access graph-access-response-schema
+   :graphs/delete graph-delete-response-schema
+   :worker/health http-ok-response-schema
+   :sync/health http-ok-response-schema
+   :sync/pull pull-ok-schema
+   :sync/tx-batch [:or tx-batch-ok-schema tx-reject-schema http-error-response-schema]
+   :sync/snapshot-rows snapshot-rows-response-schema
+   :sync/snapshot-import snapshot-import-response-schema
+   :sync/admin-reset http-ok-response-schema
+   :assets/get asset-get-response-schema
+   :assets/put http-ok-response-schema
+   :assets/delete http-ok-response-schema
+   :error http-error-response-schema})
+
+(def ^:private json-transformer
+  (mt/transformer
+   {:name :db-sync/json}
+   mt/json-transformer))
+
+(defn- ->coercer [schema]
+  (ma/coercer schema json-transformer nil
+              #(ma/-fail! ::db-sync-malli-coerce
+                          (select-keys % [:value]))))
+
+(def ws-client-message-coercer (->coercer ws-client-message-schema))
+(def ws-server-message-coercer (->coercer ws-server-message-schema))
+
+(def http-request-coercers
+  (into {}
+        (map (fn [[k schema]] [k (->coercer schema)]))
+        http-request-schemas))
+
+(def http-response-coercers
+  (into {}
+        (map (fn [[k schema]] [k (->coercer schema)]))
+        http-response-schemas))

+ 155 - 75
deps/db-sync/src/logseq/db_sync/worker.cljs

@@ -9,6 +9,7 @@
             [logseq.db.common.normalize :as db-normalize]
             [logseq.db-sync.common :as common :refer [cors-headers]]
             [logseq.db-sync.cycle :as cycle]
+            [logseq.db-sync.malli-schema :as db-sync-schema]
             [logseq.db-sync.protocol :as protocol]
             [logseq.db-sync.storage :as storage]
             [logseq.db-sync.worker-core :as worker-core]
@@ -71,7 +72,7 @@
         method (.-method request)]
     (cond
       (= path "/health")
-      (common/json-response {:ok true})
+      (json-response :worker/health {:ok true})
 
       (or (= path "/graphs")
           (string/starts-with? path "/graphs/"))
@@ -85,7 +86,7 @@
             (if (.-ok access-resp)
               (handle-assets request env)
               access-resp))
-          (common/bad-request "invalid asset path")))
+          (bad-request "invalid asset path")))
 
       (= method "OPTIONS")
       (common/options-response)
@@ -115,10 +116,10 @@
                     (let [rewritten (js/Request. new-url request)]
                       (.fetch stub rewritten))))
                 access-resp)))
-          (common/bad-request "missing graph id")))
+          (bad-request "missing graph id")))
 
       :else
-      (common/not-found))))
+      (not-found))))
 
 (def worker
   #js {:fetch (fn [request env _ctx]
@@ -128,11 +129,70 @@
   (storage/get-t (.-sql self)))
 
 (defn- send! [ws msg]
-  (.send ws (protocol/encode-message msg)))
+  (when (ws-open? ws)
+    (if-let [coerced (coerce-ws-server-message msg)]
+      (.send ws (protocol/encode-message coerced))
+      (do
+        (log/error :db-sync/ws-response-invalid {:message msg})
+        (.send ws (protocol/encode-message {:type "error" :message "server error"}))))))
 
 (defn- ws-open? [ws]
   (= 1 (.-readyState ws)))
 
+(def ^:private invalid-coerce ::invalid-coerce)
+
+(defn- coerce
+  [coercer value context]
+  (try
+    (coercer value)
+    (catch :default e
+      (log/error :db-sync/malli-coerce-failed (merge context {:error e :value value}))
+      invalid-coerce)))
+
+(defn- coerce-ws-client-message [message]
+  (when message
+    (let [coerced (coerce db-sync-schema/ws-client-message-coercer message {:schema :ws/client})]
+      (when-not (= coerced invalid-coerce)
+        coerced))))
+
+(defn- coerce-ws-server-message [message]
+  (when message
+    (let [coerced (coerce db-sync-schema/ws-server-message-coercer message {:schema :ws/server})]
+      (when-not (= coerced invalid-coerce)
+        coerced))))
+
+(defn- coerce-http-request [schema-key body]
+  (if-let [coercer (get db-sync-schema/http-request-coercers schema-key)]
+    (let [coerced (coerce coercer body {:schema schema-key :dir :request})]
+      (when-not (= coerced invalid-coerce)
+        coerced))
+    body))
+
+(defn- json-response
+  ([schema-key data] (json-response schema-key data 200))
+  ([schema-key data status]
+   (if-let [coercer (get db-sync-schema/http-response-coercers schema-key)]
+     (let [coerced (coerce coercer data {:schema schema-key :dir :response})]
+       (if (= coerced invalid-coerce)
+         (common/json-response {:error "server error"} 500)
+         (common/json-response coerced status)))
+     (common/json-response data status))))
+
+(defn- error-response [message status]
+  (json-response :error {:error message} status))
+
+(defn- bad-request [message]
+  (error-response message 400))
+
+(defn- unauthorized []
+  (error-response "unauthorized" 401))
+
+(defn- forbidden []
+  (error-response "forbidden" 403))
+
+(defn- not-found []
+  (error-response "not found" 404))
+
 (defn- broadcast! [^js self sender msg]
   (let [clients (.getWebSockets (.-state self))]
     (doseq [ws clients]
@@ -157,6 +217,15 @@
                     after
                     limit)))
 
+(defn- snapshot-row->map [row]
+  (if (array? row)
+    {:addr (aget row 0)
+     :content (aget row 1)
+     :addresses (aget row 2)}
+    {:addr (aget row "addr")
+     :content (aget row "content")
+     :addresses (aget row "addresses")}))
+
 (defn- handle-assets [request ^js env]
   (let [url (js/URL. (.-url request))
         path (.-pathname url)
@@ -169,15 +238,13 @@
       (if-let [{:keys [key asset-type]} (parse-asset-path path)]
         (let [^js bucket (.-LOGSEQ_SYNC_ASSETS env)]
           (if-not bucket
-            (js/Response. (js/JSON.stringify #js {:error "missing assets bucket"})
-                          #js {:status 500 :headers (cors-headers)})
+            (error-response "missing assets bucket" 500)
             (case method
               "GET"
               (.then (.get bucket key)
                      (fn [^js obj]
                        (if (nil? obj)
-                         (js/Response. (js/JSON.stringify #js {:error "not found"})
-                                       #js {:status 404 :headers (cors-headers)})
+                         (error-response "not found" 404)
                          (let [content-type (or (.-contentType (.-httpMetadata obj))
                                                 "application/octet-stream")]
                            (js/Response. (.-body obj)
@@ -191,8 +258,7 @@
               (.then (.arrayBuffer request)
                      (fn [buf]
                        (if (> (.-byteLength buf) max-asset-size)
-                         (js/Response. (js/JSON.stringify #js {:error "asset too large"})
-                                       #js {:status 413 :headers (cors-headers)})
+                         (error-response "asset too large" 413)
                          (.then (.put bucket
                                       key
                                       buf
@@ -201,19 +267,15 @@
                                            :customMetadata #js {:checksum (.get (.-headers request) "x-amz-meta-checksum")
                                                                 :type asset-type}})
                                 (fn [_]
-                                  (js/Response. (js/JSON.stringify #js {:ok true})
-                                                #js {:status 200 :headers (cors-headers)}))))))
+                                  (json-response :assets/put {:ok true} 200))))))
 
               "DELETE"
               (.then (.delete bucket key)
                      (fn [_]
-                       (js/Response. (js/JSON.stringify #js {:ok true})
-                                     #js {:status 200 :headers (cors-headers)})))
+                       (json-response :assets/delete {:ok true} 200)))
 
-              (js/Response. (js/JSON.stringify #js {:error "method not allowed"})
-                            #js {:status 405 :headers (cors-headers)}))))
-        (js/Response. (js/JSON.stringify #js {:error "invalid asset path"})
-                      #js {:status 400 :headers (cors-headers)})))))
+              (error-response "method not allowed" 405))))
+        (error-response "invalid asset path" 400)))))
 
 (defn- pull-response [^js self since]
   (let [sql (.-sql self)
@@ -240,8 +302,8 @@
       (common/sql-exec sql "delete from sync_meta")
       (storage/init-schema! sql)
       (storage/set-t! sql 0))
-    (when (and rows (pos? (.-length rows)))
-      (doseq [[addr content addresses] (array-seq rows)]
+    (when (seq rows)
+      (doseq [[addr content addresses] rows]
         (common/sql-exec sql
                          (str "insert into kvs (addr, content, addresses) values (?, ?, ?)"
                               " on conflict(addr) do update set content = excluded.content, addresses = excluded.addresses")
@@ -310,7 +372,7 @@
            :reason "empty tx data"})))))
 
 (defn- handle-ws-message! [^js self ^js ws raw]
-  (let [message (protocol/parse-message raw)]
+  (let [message (-> raw protocol/parse-message coerce-ws-client-message)]
     (if-not (map? message)
       (send! ws {:type "error" :message "invalid message"})
       (case (:type message)
@@ -358,7 +420,7 @@
               (.catch resp
                       (fn [e]
                         (log/error :db-sync/http-error {:error e})
-                        (common/json-response {:error "server error"} 500)))
+                        (error-response "server error" 500)))
               resp))]
     (try
       (let [url (js/URL. (.-url request))
@@ -371,11 +433,11 @@
             (common/options-response)
 
             (and (= method "GET") (= path "/health"))
-            (common/json-response {:ok true})
+            (json-response :sync/health {:ok true})
 
             (and (= method "GET") (= path "/pull"))
             (let [since (or (parse-int (.get (.-searchParams url) "since")) 0)]
-              (common/json-response (pull-response self since)))
+              (json-response :sync/pull (pull-response self since)))
 
             ;; (and (= method "GET") (= path "/snapshot"))
             ;; (common/json-response (snapshot-response self))
@@ -388,13 +450,14 @@
                             (max 1)
                             (min snapshot-rows-max-limit))
                   rows (fetch-kvs-rows (.-sql self) after limit)
+                  rows (mapv snapshot-row->map rows)
                   last-addr (if (seq rows)
-                              (apply max (map (fn [row] (aget row "addr")) rows))
+                              (apply max (map :addr rows))
                               after)
                   done? (< (count rows) limit)]
-              (common/json-response {:rows rows
-                                     :last_addr last-addr
-                                     :done done?}))
+              (json-response :sync/snapshot-rows {:rows rows
+                                                  :last_addr last-addr
+                                                  :done done?}))
 
             (and (= method "DELETE") (= path "/admin/reset"))
             (do
@@ -402,34 +465,42 @@
               (common/sql-exec (.-sql self) "drop table if exists tx_log")
               (common/sql-exec (.-sql self) "drop table if exists sync_meta")
               (storage/init-schema! (.-sql self))
-              (common/json-response {:ok true}))
+              (json-response :sync/admin-reset {:ok true}))
 
             (and (= method "POST") (= path "/tx/batch"))
             (.then (common/read-json request)
                    (fn [result]
                      (if (nil? result)
-                       (common/bad-request "missing body")
-                       (let [txs (js->clj (aget result "txs"))
-                             t-before (parse-int (aget result "t_before"))]
-                         (if (and (sequential? txs) (every? string? txs))
-                           (common/json-response (handle-tx-batch! self nil txs t-before))
-                           (common/bad-request "invalid tx"))))))
+                       (bad-request "missing body")
+                       (let [body (js->clj result :keywordize-keys true)
+                             body (coerce-http-request :sync/tx-batch body)]
+                         (if (nil? body)
+                           (bad-request "invalid tx")
+                           (let [{:keys [txs t_before]} body
+                                 t-before (parse-int t_before)]
+                             (if (and (sequential? txs) (every? string? txs))
+                               (json-response :sync/tx-batch (handle-tx-batch! self nil txs t-before))
+                               (bad-request "invalid tx"))))))))
 
             (and (= method "POST") (= path "/snapshot/import"))
             (.then (common/read-json request)
                    (fn [result]
                      (if (nil? result)
-                       (common/bad-request "missing body")
-                       (let [rows (aget result "rows")
-                             reset? (true? (aget result "reset"))]
-                         (import-snapshot! self rows reset?)
-                         (common/json-response {:ok true :count (if rows (.-length rows) 0)})))))
+                       (bad-request "missing body")
+                       (let [body (js->clj result :keywordize-keys true)
+                             body (coerce-http-request :sync/snapshot-import body)]
+                         (if (nil? body)
+                           (bad-request "invalid body")
+                           (let [{:keys [rows reset]} body]
+                             (import-snapshot! self rows reset)
+                             (json-response :sync/snapshot-import {:ok true
+                                                                   :count (count rows)})))))))
 
             :else
-            (common/not-found))))
+            (not-found))))
       (catch :default e
         (log/error :db-sync/http-error {:error e})
-        (common/json-response {:error "server error"} 500)))))
+        (error-response "server error" 500)))))
 
 (defclass SyncDO
   (extends DurableObject)
@@ -474,11 +545,18 @@
 
 (defn- index-list [sql owner-sub]
   (if (string? owner-sub)
-    (common/get-sql-rows
-     (common/sql-exec sql
-                      (str "select graph_id, graph_name, schema_version, created_at, updated_at "
-                           "from graphs where owner_sub = ? order by updated_at desc")
-                      owner-sub))
+    (let [rows (common/get-sql-rows
+                (common/sql-exec sql
+                                 (str "select graph_id, graph_name, schema_version, created_at, updated_at "
+                                      "from graphs where owner_sub = ? order by updated_at desc")
+                                 owner-sub))]
+      (mapv (fn [row]
+              {:graph_id (aget row "graph_id")
+               :graph_name (aget row "graph_name")
+               :schema_version (aget row "schema_version")
+               :created_at (aget row "created_at")
+               :updated_at (aget row "updated_at")})
+            rows))
     []))
 
 (defn- index-upsert! [sql graph-id graph-name owner-sub schema-version]
@@ -531,32 +609,34 @@
           (p/let [claims (auth-claims request env)]
             (cond
               (nil? claims)
-              (common/unauthorized)
+              (unauthorized)
 
               (and (= method "GET") (= ["graphs"] parts))
               (let [owner-sub (aget claims "sub")]
                 (if (string? owner-sub)
-                  (common/json-response {:graphs (index-list sql owner-sub)})
-                  (common/unauthorized)))
+                  (json-response :graphs/list {:graphs (index-list sql owner-sub)})
+                  (unauthorized)))
 
               (and (= method "POST") (= ["graphs"] parts))
               (.then (common/read-json request)
                      (fn [result]
-                       (let [graph-id (str (random-uuid))
-                             graph-name (aget result "graph_name")
-                             owner-sub (aget claims "sub")
-                             schema-version (aget result "schema_version")]
-                         (cond
-                           (not (string? owner-sub))
-                           (common/unauthorized)
-
-                           (not (string? graph-name))
-                           (common/bad-request "missing graph_name")
-
-                           :else
-                           (do
-                             (index-upsert! sql graph-id graph-name owner-sub schema-version)
-                             (common/json-response {:graph_id graph-id}))))))
+                       (if (nil? result)
+                         (bad-request "missing body")
+                         (let [body (js->clj result :keywordize-keys true)
+                               body (coerce-http-request :graphs/create body)
+                               graph-id (str (random-uuid))
+                               owner-sub (aget claims "sub")]
+                           (cond
+                             (not (string? owner-sub))
+                             (unauthorized)
+
+                             (nil? body)
+                             (bad-request "invalid body")
+
+                             :else
+                             (let [{:keys [graph_name schema_version]} body]
+                               (index-upsert! sql graph-id graph_name owner-sub schema_version)
+                               (json-response :graphs/create {:graph_id graph-id})))))))
 
               (and (= method "GET")
                    (= 3 (count parts))
@@ -566,13 +646,13 @@
                     owner-sub (aget claims "sub")]
                 (cond
                   (not (string? owner-sub))
-                  (common/unauthorized)
+                  (unauthorized)
 
                   (index-owns-graph? sql graph-id owner-sub)
-                  (common/json-response {:ok true})
+                  (json-response :graphs/access {:ok true})
 
                   :else
-                  (common/forbidden)))
+                  (forbidden)))
 
               (and (= method "DELETE")
                    (= 2 (count parts))
@@ -581,13 +661,13 @@
                     owner-sub (aget claims "sub")]
                 (cond
                   (not (seq graph-id))
-                  (common/bad-request "missing graph id")
+                  (bad-request "missing graph id")
 
                   (not (string? owner-sub))
-                  (common/unauthorized)
+                  (unauthorized)
 
                   (not (index-owns-graph? sql graph-id owner-sub))
-                  (common/forbidden)
+                  (forbidden)
 
                   :else
                   (do
@@ -597,13 +677,13 @@
                           stub (.get namespace do-id)
                           reset-url (str (.-origin url) "/admin/reset")]
                       (.fetch stub (js/Request. reset-url #js {:method "DELETE"})))
-                    (common/json-response {:graph_id graph-id :deleted true}))))
+                    (json-response :graphs/delete {:graph_id graph-id :deleted true}))))
 
               :else
-              (common/not-found)))))
+              (not-found)))))
       (catch :default error
         (log/error :db-sync/index-error error)
-        (common/json-response {:error "server error"} 500)))))
+        (error-response "server error" 500)))))
 
 (defclass SyncIndexDO
   (extends DurableObject)

+ 74 - 24
src/main/frontend/handler/db_based/db_sync.cljs

@@ -6,9 +6,9 @@
             [frontend.handler.repo :as repo-handler]
             [frontend.handler.user :as user-handler]
             [frontend.state :as state]
-            [frontend.worker.rtc.client-op :as client-op]
             [lambdaisland.glogi :as log]
             [logseq.db :as ldb]
+            [logseq.db-sync.malli-schema :as db-sync-schema]
             [logseq.db.sqlite.util :as sqlite-util]
             [promesa.core :as p]))
 
@@ -40,17 +40,56 @@
     (assoc opts :headers (merge (or (:headers opts) {}) auth))
     opts))
 
+(declare coerce-http-response)
+
 (defn- fetch-json
-  [url opts]
+  [url opts {:keys [response-schema error-schema] :or {error-schema :error}}]
   (p/let [resp (js/fetch url (clj->js (with-auth-headers opts)))
           text (.text resp)
           data (when (seq text) (js/JSON.parse text))]
     (if (.-ok resp)
-      data
-      (throw (ex-info "db-sync request failed"
-                      {:status (.-status resp)
-                       :url url
-                       :body data})))))
+      (let [body (js->clj data :keywordize-keys true)
+            body (if response-schema
+                   (coerce-http-response response-schema body)
+                   body)]
+        (if (or (nil? response-schema) body)
+          body
+          (throw (ex-info "db-sync invalid response"
+                          {:status (.-status resp)
+                           :url url
+                           :body body}))))
+      (let [body (when data (js->clj data :keywordize-keys true))
+            body (if error-schema
+                   (coerce-http-response error-schema body)
+                   body)]
+        (throw (ex-info "db-sync request failed"
+                        {:status (.-status resp)
+                         :url url
+                         :body body}))))))
+
+(def ^:private invalid-coerce ::invalid-coerce)
+
+(defn- coerce
+  [coercer value context]
+  (try
+    (coercer value)
+    (catch :default e
+      (log/error :db-sync/malli-coerce-failed (merge context {:error e :value value}))
+      invalid-coerce)))
+
+(defn- coerce-http-request [schema-key body]
+  (if-let [coercer (get db-sync-schema/http-request-coercers schema-key)]
+    (let [coerced (coerce coercer body {:schema schema-key :dir :request})]
+      (when-not (= coerced invalid-coerce)
+        coerced))
+    body))
+
+(defn- coerce-http-response [schema-key body]
+  (if-let [coercer (get db-sync-schema/http-response-coercers schema-key)]
+    (let [coerced (coerce coercer body {:schema schema-key :dir :response})]
+      (when-not (= coerced invalid-coerce)
+        coerced))
+    body))
 
 (defn <rtc-start!
   [repo & {:keys [_stop-before-start?] :as _opts}]
@@ -72,13 +111,18 @@
         base (http-base)]
     (if base
       (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
-              result (fetch-json (str base "/graphs")
-                                 {:method "POST"
-                                  :headers {"content-type" "application/json"}
-                                  :body (js/JSON.stringify
-                                         #js {:graph_name (string/replace repo config/db-version-prefix "")
-                                              :schema_version schema-version})})
-              graph-id (aget result "graph_id")]
+              body (coerce-http-request :graphs/create
+                                        {:graph_name (string/replace repo config/db-version-prefix "")
+                                         :schema_version schema-version})
+              result (if (nil? body)
+                       (p/rejected (ex-info "db-sync invalid create-graph body"
+                                            {:repo repo}))
+                       (fetch-json (str base "/graphs")
+                                   {:method "POST"
+                                    :headers {"content-type" "application/json"}
+                                    :body (js/JSON.stringify (clj->js body))}
+                                   {:response-schema :graphs/create}))
+              graph-id (:graph_id result)]
         (if graph-id
           (p/do!
            (ldb/transact! repo [(sqlite-util/kv :logseq.kv/db-type "db")
@@ -96,7 +140,9 @@
   (let [base (http-base)]
     (if (and graph-uuid base)
       (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)]
-        (fetch-json (str base "/graphs/" graph-uuid) {:method "DELETE"}))
+        (fetch-json (str base "/graphs/" graph-uuid)
+                    {:method "DELETE"}
+                    {:response-schema :graphs/delete}))
       (p/rejected (ex-info "db-sync missing graph id"
                            {:type :db-sync/invalid-graph
                             :graph-uuid graph-uuid
@@ -109,21 +155,23 @@
     (-> (if (and graph-uuid base)
           (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
                   graph (str config/db-version-prefix graph-name)]
-            (p/loop [after -1           ; root addr is 0
+              (p/loop [after -1           ; root addr is 0
                      first-batch? true]
               (p/let [pull-resp (fetch-json (str base "/sync/" graph-uuid "/pull")
-                                            {:method "GET"})
-                      remote-tx (aget pull-resp "t")
+                                            {:method "GET"}
+                                            {:response-schema :sync/pull})
+                      remote-tx (:t pull-resp)
                       _ (when-not (integer? remote-tx)
                           (throw (ex-info "non-integer remote-tx when downloading graph"
                                           {:graph graph-name
                                            :remote-tx remote-tx})))
                       resp (fetch-json (str base "/sync/" graph-uuid "/snapshot/rows"
                                             "?after=" after "&limit=" snapshot-rows-limit)
-                                       {:method "GET"})
-                      rows (js->clj (aget resp "rows") :keywordize-keys true)
-                      done? (true? (aget resp "done"))
-                      last-addr (or (aget resp "last_addr") after)]
+                                       {:method "GET"}
+                                       {:response-schema :sync/snapshot-rows})
+                      rows (:rows resp)
+                      done? (true? (:done resp))
+                      last-addr (or (:last_addr resp) after)]
                 (p/do!
                  (state/<invoke-db-worker :thread-api/db-sync-import-kvs-rows
                                           graph rows first-batch?)
@@ -147,8 +195,10 @@
       (p/resolved [])
       (-> (p/let [_ (state/set-state! :rtc/loading-graphs? true)
                   _ (js/Promise. user-handler/task--ensure-id&access-token)
-                  resp (fetch-json (str base "/graphs") {:method "GET"})
-                  graphs (js->clj (aget resp "graphs") :keywordize-keys true)
+                  resp (fetch-json (str base "/graphs")
+                                   {:method "GET"}
+                                   {:response-schema :graphs/list})
+                  graphs (:graphs resp)
                   result (mapv (fn [graph]
                                  (merge
                                   {:url (str config/db-version-prefix (:graph_name graph))

+ 87 - 23
src/main/frontend/worker/db_sync.cljs

@@ -8,6 +8,7 @@
             [logseq.common.path :as path]
             [logseq.db :as ldb]
             [logseq.db.common.normalize :as db-normalize]
+            [logseq.db-sync.malli-schema :as db-sync-schema]
             [logseq.db.sqlite.util :as sqlite-util]
             [promesa.core :as p]))
 
@@ -85,6 +86,42 @@
 (defn- ws-open? [ws]
   (= 1 (ready-state ws)))
 
+(def ^:private invalid-coerce ::invalid-coerce)
+
+(defn- coerce
+  [coercer value context]
+  (try
+    (coercer value)
+    (catch :default e
+      (log/error :db-sync/malli-coerce-failed (merge context {:error e :value value}))
+      invalid-coerce)))
+
+(defn- coerce-ws-client-message [message]
+  (when message
+    (let [coerced (coerce db-sync-schema/ws-client-message-coercer message {:schema :ws/client})]
+      (when-not (= coerced invalid-coerce)
+        coerced))))
+
+(defn- coerce-ws-server-message [message]
+  (when message
+    (let [coerced (coerce db-sync-schema/ws-server-message-coercer message {:schema :ws/server})]
+      (when-not (= coerced invalid-coerce)
+        coerced))))
+
+(defn- coerce-http-request [schema-key body]
+  (if-let [coercer (get db-sync-schema/http-request-coercers schema-key)]
+    (let [coerced (coerce coercer body {:schema schema-key :dir :request})]
+      (when-not (= coerced invalid-coerce)
+        coerced))
+    body))
+
+(defn- coerce-http-response [schema-key body]
+  (if-let [coercer (get db-sync-schema/http-response-coercers schema-key)]
+    (let [coerced (coerce coercer body {:schema schema-key :dir :response})]
+      (when-not (= coerced invalid-coerce)
+        coerced))
+    body))
+
 (defn- reconnect-delay-ms [attempt]
   (let [exp (js/Math.pow 2 attempt)
         delay (min reconnect-max-delay-ms (* reconnect-base-delay-ms exp))
@@ -103,7 +140,9 @@
 
 (defn- send! [ws message]
   (when (ws-open? ws)
-    (.send ws (js/JSON.stringify (clj->js message)))))
+    (if-let [coerced (coerce-ws-client-message message)]
+      (.send ws (js/JSON.stringify (clj->js coerced)))
+      (log/error :db-sync/ws-request-invalid {:message message}))))
 
 (defn- normalize-tx-data [db-after db-before tx-data]
   (->> tx-data
@@ -119,16 +158,29 @@
       nil)))
 
 (defn- fetch-json
-  [url opts]
+  [url opts {:keys [response-schema error-schema] :or {error-schema :error}}]
   (p/let [resp (js/fetch url (clj->js (with-auth-headers opts)))
           text (.text resp)
           data (when (seq text) (js/JSON.parse text))]
     (if (.-ok resp)
-      (js->clj data :keywordize-keys true)
-      (throw (ex-info "db-sync request failed"
-                      {:status (.-status resp)
-                       :url url
-                       :body data})))))
+      (let [body (js->clj data :keywordize-keys true)
+            body (if response-schema
+                   (coerce-http-response response-schema body)
+                   body)]
+        (if (or (nil? response-schema) body)
+          body
+          (throw (ex-info "db-sync invalid response"
+                          {:status (.-status resp)
+                           :url url
+                           :body body}))))
+      (let [body (when data (js->clj data :keywordize-keys true))
+            body (if error-schema
+                   (coerce-http-response error-schema body)
+                   body)]
+        (throw (ex-info "db-sync request failed"
+                        {:status (.-status resp)
+                         :url url
+                         :body body}))))))
 
 (def ^:private asset-update-attrs
   #{:logseq.property.asset/remote-metadata
@@ -184,7 +236,7 @@
 (declare enqueue-asset-sync!)
 (declare enqueue-asset-initial-download!)
 (defn- handle-message! [repo client raw]
-  (when-let [message (parse-message raw)]
+  (when-let [message (-> raw parse-message coerce-ws-server-message)]
     (let [local-tx (or (client-op/get-local-tx repo) 0)
           remote-tx (:t message)]
       (case (:type message)
@@ -614,6 +666,9 @@
                  :bind #js [last-addr limit]
                  :rowMode "array"}))
 
+(defn- normalize-snapshot-rows [rows]
+  (mapv (fn [row] (vec row)) (array-seq rows)))
+
 (defn upload-graph!
   [repo]
   (let [base (http-base-url)
@@ -628,20 +683,29 @@
                    first-batch? true]
             (let [rows (fetch-kvs-rows db last-addr upload-kvs-batch-size)]
               (if (empty? rows)
-                (p/let [_ (fetch-json (str base "/sync/" graph-id "/snapshot/import")
-                                      {:method "POST"
-                                       :headers {"content-type" "application/json"}
-                                       :body (js/JSON.stringify
-                                              #js {:done true})})]
-                  (client-op/add-all-exists-asset-as-ops repo)
-                  {:graph-id graph-id})
-                (let [max-addr (apply max (map first rows))]
-                  (p/let [_ (fetch-json (str base "/sync/" graph-id "/snapshot/import")
-                                        {:method "POST"
-                                         :headers {"content-type" "application/json"}
-                                         :body (js/JSON.stringify
-                                                #js {:reset first-batch?
-                                                     :rows rows})})]
-                    (p/recur max-addr false)))))))
+                (let [body (coerce-http-request :sync/snapshot-import {:reset false :rows []})]
+                  (if (nil? body)
+                    (p/rejected (ex-info "db-sync invalid snapshot body"
+                                         {:repo repo :graph-id graph-id}))
+                    (p/let [_ (fetch-json (str base "/sync/" graph-id "/snapshot/import")
+                                          {:method "POST"
+                                           :headers {"content-type" "application/json"}
+                                           :body (js/JSON.stringify (clj->js body))}
+                                          {:response-schema :sync/snapshot-import})]
+                      (client-op/add-all-exists-asset-as-ops repo)
+                      {:graph-id graph-id})))
+                (let [max-addr (apply max (map first rows))
+                      rows (normalize-snapshot-rows rows)
+                      body (coerce-http-request :sync/snapshot-import {:reset first-batch?
+                                                                       :rows rows})]
+                  (if (nil? body)
+                    (p/rejected (ex-info "db-sync invalid snapshot body"
+                                         {:repo repo :graph-id graph-id}))
+                    (p/let [_ (fetch-json (str base "/sync/" graph-id "/snapshot/import")
+                                          {:method "POST"
+                                           :headers {"content-type" "application/json"}
+                                           :body (js/JSON.stringify (clj->js body))}
+                                          {:response-schema :sync/snapshot-import})]
+                      (p/recur max-addr false))))))))
         (p/rejected (ex-info "db-sync missing sqlite db"
                              {:repo repo :graph-id graph-id}))))))