Tienson Qin 1 неделя назад
Родитель
Сommit
dd30969902
1 измененных файлов с 128 добавлено и 371 удалено
  1. 128 371
      src/main/frontend/worker/db_sync.cljs

+ 128 - 371
src/main/frontend/worker/db_sync.cljs

@@ -91,67 +91,6 @@
 (defn- ws-open? [ws]
   (= 1 (ready-state ws)))
 
-(defonce ^:private *db-sync-ws-registry (atom {}))
-
-(defn- registered-ws [repo]
-  (get @*db-sync-ws-registry repo))
-
-(defn- register-ws! [repo ws]
-  (swap! *db-sync-ws-registry assoc repo ws))
-
-(defn- unregister-ws! [repo ws]
-  (when (identical? ws (registered-ws repo))
-    (swap! *db-sync-ws-registry dissoc repo)))
-
-(defn- registered-ws-open? [repo]
-  (when-let [ws (registered-ws repo)]
-    (ws-open? ws)))
-
-(def ^:private state-start :start)
-(def ^:private state-hello-wait :hello-wait)
-(def ^:private state-hello-done :hello-done)
-(def ^:private state-pull-wait :pull-wait)
-(def ^:private state-tx-batch-wait :tx-batch-wait)
-(def ^:private state-tx-reject-stale :tx-reject-stale)
-(def ^:private state-tx-reject-cycle :tx-reject-cycle)
-(def ^:private state-end :end)
-
-(defn- client-state [client]
-  (or (some-> client :state deref) state-start))
-
-(defn- set-client-state! [client state]
-  (when-let [state* (:state client)]
-    (reset! state* state))
-  state)
-
-(defn- pull-active? [client]
-  (true? (some-> client :pull-active? deref)))
-
-(defn- set-pull-active! [client active?]
-  (when-let [pull* (:pull-active? client)]
-    (reset! pull* (true? active?)))
-  active?)
-
-(defn- tx-return-state [client]
-  (or (some-> client :tx-return deref) state-hello-done))
-
-(defn- set-tx-return-state! [client state]
-  (when-let [return* (:tx-return client)]
-    (reset! return* state))
-  state)
-
-(defn- clear-tx-return-state! [client]
-  (when-let [return* (:tx-return client)]
-    (reset! return* nil)))
-
-(defn- stale-inflight? [client]
-  (true? (some-> client :stale-inflight? deref)))
-
-(defn- set-stale-inflight! [client active?]
-  (when-let [stale* (:stale-inflight? client)]
-    (reset! stale* (true? active?)))
-  active?)
-
 (def ^:private invalid-coerce ::invalid-coerce)
 
 (defn- coerce
@@ -178,10 +117,6 @@
   (log/error tag data)
   (throw (ex-info (name tag) data)))
 
-(defn- fail-fast-state! [client tag data]
-  (set-client-state! client state-end)
-  (fail-fast tag data))
-
 (defn- require-number [value context]
   (when-not (number? value)
     (fail-fast :db-sync/invalid-field (assoc context :value value))))
@@ -231,18 +166,6 @@
     (clear-reconnect-timer! reconnect)
     (swap! reconnect assoc :attempt 0)))
 
-(defn- reconnect-pending? [client]
-  (boolean (some-> client :reconnect deref :timer)))
-
-(defn- reset-client-machine! [client]
-  (set-client-state! client state-start)
-  (set-pull-active! client false)
-  (clear-tx-return-state! client)
-  (set-stale-inflight! client false)
-  (when-let [inflight (:inflight client)]
-    (reset! inflight []))
-  client)
-
 (defn- send! [ws message]
   (when (ws-open? ws)
     (if-let [coerced (coerce-ws-client-message message)]
@@ -347,16 +270,6 @@
          tx-data))
       tx-data)))
 
-(defn- entity-from-ref
-  [db entity-ref]
-  (cond
-    (nil? entity-ref) nil
-    (vector? entity-ref) (d/entity db entity-ref)
-    (uuid? entity-ref) (d/entity db [:block/uuid entity-ref])
-    (keyword? entity-ref) (d/entity db [:db/ident entity-ref])
-    (number? entity-ref) (d/entity db entity-ref)
-    :else nil))
-
 (defn- reparent-cycle-block!
   [conn block]
   (when-let [page (:block/page block)]
@@ -369,14 +282,16 @@
 
 (defn- fix-cycle-after-remote-tx!
   [conn db tx-data]
+  ;; FIXME: replace `entity` with `eid`
   (when-let [{:keys [attr entity]} (and (seq tx-data)
                                         (db-sync-cycle/detect-cycle db tx-data))]
-    (log/info :db-sync/remote-cycle-detected
-              {:attr attr
-               :entity entity})
-    (when (= attr :block/parent)
-      (when-let [block (entity-from-ref db entity)]
-        (reparent-cycle-block! conn block)))))
+    (let [eid entity]
+      (log/info :db-sync/remote-cycle-detected
+                {:attr attr
+                 :eid eid})
+      (when (= attr :block/parent)
+        (when-let [block (d/entity db eid)]
+          (reparent-cycle-block! conn block))))))
 
 (defn- reconcile-cycle! [repo attr server_values]
   (if-let [conn (worker-state/get-datascript-conn repo)]
@@ -487,31 +402,21 @@
                              [:db.fn/retractEntity [:db-sync/tx-id tx-id]])
                            tx-ids)))))
 
-(defn- send-tx-batch! [repo client tx-ids txs]
-  (when-let [ws (:ws client)]
-    (when (and (ws-open? ws) (seq txs))
-      (reset! (:inflight client) tx-ids)
-      (set-tx-return-state! client (if (= (client-state client) state-pull-wait)
-                                     state-pull-wait
-                                     state-hello-done))
-      (set-client-state! client state-tx-batch-wait)
-      (send! ws {:type "tx/batch"
-                 :t_before (or (client-op/get-local-tx repo) 0)
-                 :txs txs}))))
-
 (defn- flush-pending!
   [repo client]
-  (let [state (client-state client)]
-    (when (contains? #{state-hello-done state-pull-wait} state)
-      (let [inflight @(:inflight client)]
-        (when (empty? inflight)
-          (when-let [ws (:ws client)]
-            (when (ws-open? ws)
-              (let [batch (pending-txs repo 50)]
-                (when (seq batch)
-                  (let [tx-ids (mapv :tx-id batch)
-                        txs (mapv :tx batch)]
-                    (send-tx-batch! repo client tx-ids txs)))))))))))
+  (let [inflight @(:inflight client)]
+    (when (empty? inflight)
+      (when-let [ws (:ws client)]
+        (when (ws-open? ws)
+          (let [batch (pending-txs repo 50)]
+            (when (seq batch)
+              (let [tx-ids (mapv :tx-id batch)
+                    txs (mapv :tx batch)]
+                (when (seq txs)
+                  (reset! (:inflight client) tx-ids)
+                  (send! ws {:type "tx/batch"
+                             :t_before (or (client-op/get-local-tx repo) 0)
+                             :txs txs}))))))))))
 
 (defn- pending-txs-by-ids
   [repo tx-ids]
@@ -573,10 +478,6 @@
                     :send-queue (atom (p/resolved nil))
                     :asset-queue (atom (p/resolved nil))
                     :inflight (atom [])
-                    :state (atom state-start)
-                    :pull-active? (atom false)
-                    :tx-return (atom nil)
-                    :stale-inflight? (atom false)
                     :reconnect (atom {:attempt 0 :timer nil})}]
         (swap! worker-state/*db-sync-clients assoc repo client)
         client)))
@@ -811,206 +712,85 @@
         (log/error :db-sync/apply-remote-tx-failed {:error e})))
     (fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))
 
-(defn- send-pull! [client ws since]
-  (send! ws {:type "pull" :since since})
-  (set-pull-active! client true)
-  (set-client-state! client state-pull-wait))
-
-(defn- mark-pull-complete! [client]
-  (set-pull-active! client false)
-  (case (client-state client)
-    :pull-wait (set-client-state! client state-hello-done)
-    :tx-batch-wait (set-tx-return-state! client state-hello-done)
-    nil))
-
-(defn- complete-tx-batch! [client]
-  (let [next (tx-return-state client)]
-    (clear-tx-return-state! client)
-    (if (= next state-pull-wait)
-      (do
-        (set-client-state! client state-pull-wait)
-        (set-pull-active! client true))
-      (do
-        (set-client-state! client state-hello-done)
-        (set-pull-active! client false)))
-    next))
-
-(defn- handle-pull-ok! [repo client message]
-  (let [txs (:txs message)
-        remote-tx (:t message)
-        _ (require-non-negative remote-tx {:repo repo :type "pull/ok"})
-        _ (require-seq txs {:repo repo :type "pull/ok" :field :txs})
-        tx (mapcat (fn [data]
-                     (parse-transit (:tx data) {:repo repo :type "pull/ok"}))
-                   txs)]
-    (when tx
-      (apply-remote-tx! repo client tx)
-      (client-op/update-local-tx repo remote-tx)
-      (mark-pull-complete! client)
-      (if (stale-inflight? client)
-        (let [tx-ids @(:inflight client)
-              entries (pending-txs-by-ids repo tx-ids)
-              txs (mapv :tx entries)]
-          (set-stale-inflight! client false)
-          (cond
-            (empty? tx-ids)
-            (flush-pending! repo client)
-
-            (empty? txs)
-            (fail-fast :db-sync/missing-field
-                       {:repo repo :type "tx/reject" :field :inflight})
-
-            :else
-            (send-tx-batch! repo client tx-ids txs)))
-        (flush-pending! repo client)))))
-
-(defn- handle-tx-batch-ok! [repo client message]
-  (let [remote-tx (:t message)]
-    (require-non-negative remote-tx {:repo repo :type "tx/batch/ok"})
-    (client-op/update-local-tx repo remote-tx)
-    (remove-pending-txs! repo @(:inflight client))
-    (reset! (:inflight client) [])
-    (set-stale-inflight! client false)
-    (complete-tx-batch! client)
-    (when (and (= (client-state client) state-pull-wait)
-               (pull-active? client))
-      (send-pull! client (:ws client) (or (client-op/get-local-tx repo) 0)))
-    (flush-pending! repo client)))
-
 (defn- handle-message! [repo client raw]
   (let [message (-> raw parse-message coerce-ws-server-message)]
     (when-not (map? message)
-      (fail-fast-state! client :db-sync/response-parse-failed {:repo repo :raw raw}))
-    (let [state (client-state client)
-          local-tx (or (client-op/get-local-tx repo) 0)
-          remote-tx (:t message)
-          msg-type (:type message)
-          ws (:ws client)]
-      (case state
-        :start
-        (fail-fast-state! client :db-sync/invalid-field
-                          {:repo repo :state state :type msg-type})
-
-        :hello-wait
-        (case msg-type
-          "hello" (do
-                    (require-non-negative remote-tx {:repo repo :type "hello"})
-                    (set-client-state! client state-hello-done)
-                    (when (> remote-tx local-tx)
-                      (send-pull! client ws local-tx))
-                    (enqueue-asset-sync! repo client)
-                    (enqueue-asset-initial-download! repo client)
-                    (flush-pending! repo client))
-          (fail-fast-state! client :db-sync/invalid-field
-                            {:repo repo :state state :type msg-type}))
-
-        :hello-done
-        (case msg-type
-          "changed" (do
-                      (require-non-negative remote-tx {:repo repo :type "changed"})
-                      (when (< local-tx remote-tx)
-                        (send-pull! client ws local-tx)))
-          "error" (fail-fast-state! client :db-sync/invalid-field
-                                    {:repo repo :state state :type msg-type :message message})
-          "pong" nil
-          (fail-fast-state! client :db-sync/invalid-field
-                            {:repo repo :state state :type msg-type}))
-
-        :pull-wait
-        (case msg-type
-          "pull/ok" (handle-pull-ok! repo client message)
-          "changed" (do
-                      (require-non-negative remote-tx {:repo repo :type "changed"})
-                      (when (< local-tx remote-tx)
-                        (send-pull! client ws local-tx)))
-          "error" (fail-fast-state! client :db-sync/invalid-field
-                                    {:repo repo :state state :type msg-type :message message})
-          "pong" nil
-          (fail-fast-state! client :db-sync/invalid-field
-                            {:repo repo :state state :type msg-type}))
-
-        :tx-batch-wait
-        (case msg-type
-          "tx/batch/ok" (handle-tx-batch-ok! repo client message)
-          "changed" (do
-                      (require-non-negative remote-tx {:repo repo :type "changed"})
-                      (when (< local-tx remote-tx)
-                        (set-pull-active! client true)
-                        (set-tx-return-state! client state-pull-wait)))
-          "tx/reject" (let [reason (:reason message)]
-                        (when (nil? reason)
-                          (fail-fast-state! client :db-sync/missing-field
-                                            {:repo repo :type "tx/reject" :field :reason}))
-                        (when (contains? message :t)
-                          (require-non-negative remote-tx {:repo repo :type "tx/reject"}))
-                        (case reason
-                          "stale"
-                          (do
-                            (set-client-state! client state-tx-reject-stale)
-                            (clear-tx-return-state! client)
-                            (set-stale-inflight! client true)
-                            (send-pull! client ws local-tx))
-                          "cycle"
-                          (do
-                            (set-client-state! client state-tx-reject-cycle)
-                            (set-stale-inflight! client false)
-                            (when (nil? (:data message))
-                              (fail-fast-state! client :db-sync/missing-field
-                                                {:repo repo :type "tx/reject" :field :data}))
-                            (let [{:keys [attr server_values]}
-                                  (parse-transit (:data message) {:repo repo :type "tx/reject"})]
-                              (when (nil? attr)
-                                (fail-fast-state! client :db-sync/missing-field
-                                                  {:repo repo :type "tx/reject" :field :attr}))
-                              (when (nil? server_values)
-                                (fail-fast-state! client :db-sync/missing-field
-                                                  {:repo repo :type "tx/reject" :field :server_values}))
-                              ;; FIXME: fix cycle shouldn't re-trigger uploading
-                              (let [inflight-ids @(:inflight client)
-                                    inflight-entries (pending-txs-by-ids repo inflight-ids)]
-                                (log/info :db-sync/tx-reject-cycle
-                                          {:repo repo
-                                           :attr attr
-                                           :server-values (count server_values)
-                                           :entity-titles (cycle-entity-titles repo server_values)
-                                           :inflight-ids (count inflight-ids)
-                                           :local-tx local-tx
-                                           :remote-tx remote-tx})
-                                (reconcile-cycle! repo attr server_values)
-                                (remove-pending-txs! repo inflight-ids)
-                                (reset! (:inflight client) [])
-                                (requeue-non-parent-txs! repo attr server_values inflight-entries))
-                              (set-stale-inflight! client false)
-                              (clear-tx-return-state! client)
-                              (if (pull-active? client)
-                                (set-client-state! client state-pull-wait)
-                                (set-client-state! client state-hello-done))
-                              (flush-pending! repo client)))
-                          (fail-fast-state! client :db-sync/invalid-field
-                                            {:repo repo :type "tx/reject" :reason reason})))
-          "pull/ok" (if (pull-active? client)
-                      (handle-pull-ok! repo client message)
-                      (fail-fast-state! client :db-sync/invalid-field
-                                        {:repo repo :state state :type msg-type}))
-          "error" (fail-fast-state! client :db-sync/invalid-field
-                                    {:repo repo :state state :type msg-type :message message})
-          (fail-fast-state! client :db-sync/invalid-field
-                            {:repo repo :state state :type msg-type}))
-
-        :tx-reject-stale
-        (fail-fast-state! client :db-sync/invalid-field
-                          {:repo repo :state state :type msg-type})
-
-        :tx-reject-cycle
-        (fail-fast-state! client :db-sync/invalid-field
-                          {:repo repo :state state :type msg-type})
-
-        :end
-        (fail-fast-state! client :db-sync/invalid-field
-                          {:repo repo :state state :type msg-type})
-
-        (fail-fast-state! client :db-sync/invalid-field
-                          {:repo repo :state state :type msg-type})))))
+      (fail-fast :db-sync/response-parse-failed {:repo repo :raw raw}))
+    (let [local-tx (or (client-op/get-local-tx repo) 0)
+          remote-tx (:t message)]
+      (case (:type message)
+        "hello" (do
+                  (require-non-negative remote-tx {:repo repo :type "hello"})
+                  (when (> remote-tx local-tx)
+                    (send! (:ws client) {:type "pull" :since local-tx}))
+                  (enqueue-asset-sync! repo client)
+                  (enqueue-asset-initial-download! repo client)
+                  (flush-pending! repo client))
+        ;; Upload response
+        "tx/batch/ok" (do
+                        (require-non-negative remote-tx {:repo repo :type "tx/batch/ok"})
+                        (client-op/update-local-tx repo remote-tx)
+                        (remove-pending-txs! repo @(:inflight client))
+                        (reset! (:inflight client) [])
+                        (flush-pending! repo client))
+        ;; Download response
+        ;; Merge batch txs to one tx, does it really work? We'll see
+        "pull/ok" (let [txs (:txs message)
+                        _ (require-non-negative remote-tx {:repo repo :type "pull/ok"})
+                        _ (require-seq txs {:repo repo :type "pull/ok" :field :txs})
+                        tx (mapcat (fn [data]
+                                     (parse-transit (:tx data) {:repo repo :type "pull/ok"}))
+                                   txs)]
+                    (when tx
+                      (apply-remote-tx! repo client tx)
+                      (client-op/update-local-tx repo remote-tx)
+                      (flush-pending! repo client)))
+        "changed" (do
+                    (require-non-negative remote-tx {:repo repo :type "changed"})
+                    (when (< local-tx remote-tx)
+                      (send! (:ws client) {:type "pull" :since local-tx})))
+        "tx/reject" (let [reason (:reason message)]
+                      (when (nil? reason)
+                        (fail-fast :db-sync/missing-field
+                                   {:repo repo :type "tx/reject" :field :reason}))
+                      (when (contains? message :t)
+                        (require-non-negative remote-tx {:repo repo :type "tx/reject"}))
+                      (case reason
+                        "stale"
+                        (send! (:ws client) {:type "pull" :since local-tx})
+                        "cycle"
+                        (do
+                          (when (nil? (:data message))
+                            (fail-fast :db-sync/missing-field
+                                       {:repo repo :type "tx/reject" :field :data}))
+                          (let [{:keys [attr server_values]}
+                                (parse-transit (:data message) {:repo repo :type "tx/reject"})]
+                            (when (nil? attr)
+                              (fail-fast :db-sync/missing-field
+                                         {:repo repo :type "tx/reject" :field :attr}))
+                            (when (nil? server_values)
+                              (fail-fast :db-sync/missing-field
+                                         {:repo repo :type "tx/reject" :field :server_values}))
+                            ;; FIXME: fix cycle shouldn't re-trigger uploading
+                            (let [inflight-ids @(:inflight client)
+                                  inflight-entries (pending-txs-by-ids repo inflight-ids)]
+                              (log/info :db-sync/tx-reject-cycle
+                                        {:repo repo
+                                         :attr attr
+                                         :server-values (count server_values)
+                                         :entity-titles (cycle-entity-titles repo server_values)
+                                         :inflight-ids (count inflight-ids)
+                                         :local-tx local-tx
+                                         :remote-tx remote-tx})
+                              (reconcile-cycle! repo attr server_values)
+                              (remove-pending-txs! repo inflight-ids)
+                              (reset! (:inflight client) [])
+                              (requeue-non-parent-txs! repo attr server_values inflight-entries))
+                            (flush-pending! repo client)))
+                        (fail-fast :db-sync/invalid-field
+                                   {:repo repo :type "tx/reject" :reason reason})))
+        (fail-fast :db-sync/invalid-field
+                   {:repo repo :type (:type message)})))))
 
 (declare connect!)
 
@@ -1042,19 +822,13 @@
           (log/error :db-sync/ws-error {:repo repo :error event})))
   (set! (.-onclose ws)
         (fn [_]
-          (unregister-ws! repo ws)
           (log/info :db-sync/ws-closed {:repo repo})
-          (set-client-state! client state-end)
           (schedule-reconnect! repo client url :close))))
 
 (defn- start-pull-loop! [client _ws]
   client)
 
 (defn- stop-client! [client]
-  (set-client-state! client state-end)
-  (set-pull-active! client false)
-  (clear-tx-return-state! client)
-  (set-stale-inflight! client false)
   (when-let [reconnect (:reconnect client)]
     (clear-reconnect-timer! reconnect))
   (when-let [ws (:ws client)]
@@ -1064,24 +838,16 @@
         nil))))
 
 (defn- connect! [repo client url]
-  (if (registered-ws-open? repo)
-    (do
-      (log/info :db-sync/ws-connect-skipped {:repo repo :reason :existing-open})
-      (assoc client :ws (registered-ws repo)))
-    (let [ws (js/WebSocket. (append-token url (auth-token)))
-          updated (-> client
-                      reset-client-machine!
-                      (assoc :ws ws))]
-      (attach-ws-handlers! repo updated ws url)
-      (set! (.-onopen ws)
-            (fn [_]
-              (register-ws! repo ws)
-              (reset-reconnect! updated)
-              (set-client-state! updated state-hello-wait)
-              (send! ws {:type "hello" :client repo})
-              (enqueue-asset-sync! repo updated)
-              (enqueue-asset-initial-download! repo updated)))
-      (start-pull-loop! updated ws))))
+  (let [ws (js/WebSocket. (append-token url (auth-token)))
+        updated (assoc client :ws ws)]
+    (attach-ws-handlers! repo updated ws url)
+    (set! (.-onopen ws)
+          (fn [_]
+            (reset-reconnect! updated)
+            (send! ws {:type "hello" :client repo})
+            (enqueue-asset-sync! repo updated)
+            (enqueue-asset-initial-download! repo updated)))
+    (start-pull-loop! updated ws)))
 
 (defn stop!
   ([]
@@ -1099,36 +865,21 @@
   [repo]
   (if-not (enabled?)
     (p/resolved nil)
-    (let [client (get @worker-state/*db-sync-clients repo)]
-      (cond
-        (registered-ws-open? repo)
-        (p/resolved nil)
-
-        (and client (reconnect-pending? client))
-        (p/resolved nil)
-
-        :else
-        (p/do!
-         (stop!)
-         (let [base (ws-base-url)
-               graph-id (get-graph-id repo)]
-           (if (and (string? base) (seq base) (seq graph-id))
-             (let [client (ensure-client-state! repo)
-                   url (format-ws-url base graph-id)
-                   _ (ensure-client-graph-uuid! repo graph-id)
-                   connected (assoc client :graph-id graph-id)
-                   connected (connect! repo connected url)]
-               (swap! worker-state/*db-sync-clients assoc repo connected)
-               (p/resolved nil))
-             (do
-               (log/info :db-sync/start-skipped {:repo repo :graph-id graph-id :base base})
-               (p/resolved nil)))))))))
-
-(defn get-client-state
-  [repo]
-  (if-let [client (get @worker-state/*db-sync-clients repo)]
-    (client-state client)
-    :not-started))
+    (p/do!
+     (stop!)
+     (let [base (ws-base-url)
+           graph-id (get-graph-id repo)]
+       (if (and (string? base) (seq base) (seq graph-id))
+         (let [client (ensure-client-state! repo)
+               url (format-ws-url base graph-id)
+               _ (ensure-client-graph-uuid! repo graph-id)
+               connected (assoc client :graph-id graph-id)
+               connected (connect! repo connected url)]
+           (swap! worker-state/*db-sync-clients assoc repo connected)
+           (p/resolved nil))
+         (do
+           (log/info :db-sync/start-skipped {:repo repo :graph-id graph-id :base base})
+           (p/resolved nil)))))))
 
 (defn enqueue-local-tx!
   [repo {:keys [tx-data db-after db-before]}]
@@ -1206,3 +957,9 @@
                       (p/recur max-addr false))))))))
         (p/rejected (ex-info "db-sync missing sqlite db"
                              {:repo repo :graph-id graph-id}))))))
+
+(defn get-client-state
+  [repo]
+  (if-let [client (get @worker-state/*db-sync-clients repo)]
+    (ws-open? client)
+    :not-started))