|
|
@@ -90,6 +90,51 @@
|
|
|
(defn- ws-open? [ws]
|
|
|
(= 1 (ready-state ws)))
|
|
|
|
|
|
+(def ^:private state-start :start)
|
|
|
+(def ^:private state-hello-wait :hello-wait)
|
|
|
+(def ^:private state-hello-done :hello-done)
|
|
|
+(def ^:private state-pull-wait :pull-wait)
|
|
|
+(def ^:private state-tx-batch-wait :tx-batch-wait)
|
|
|
+(def ^:private state-tx-reject-stale :tx-reject-stale)
|
|
|
+(def ^:private state-tx-reject-cycle :tx-reject-cycle)
|
|
|
+(def ^:private state-end :end)
|
|
|
+
|
|
|
+(defn- client-state [client]
|
|
|
+ (or (some-> client :state deref) state-start))
|
|
|
+
|
|
|
+(defn- set-client-state! [client state]
|
|
|
+ (when-let [state* (:state client)]
|
|
|
+ (reset! state* state))
|
|
|
+ state)
|
|
|
+
|
|
|
+(defn- pull-active? [client]
|
|
|
+ (true? (some-> client :pull-active? deref)))
|
|
|
+
|
|
|
+(defn- set-pull-active! [client active?]
|
|
|
+ (when-let [pull* (:pull-active? client)]
|
|
|
+ (reset! pull* (true? active?)))
|
|
|
+ active?)
|
|
|
+
|
|
|
+(defn- tx-return-state [client]
|
|
|
+ (or (some-> client :tx-return deref) state-hello-done))
|
|
|
+
|
|
|
+(defn- set-tx-return-state! [client state]
|
|
|
+ (when-let [return* (:tx-return client)]
|
|
|
+ (reset! return* state))
|
|
|
+ state)
|
|
|
+
|
|
|
+(defn- clear-tx-return-state! [client]
|
|
|
+ (when-let [return* (:tx-return client)]
|
|
|
+ (reset! return* nil)))
|
|
|
+
|
|
|
+(defn- stale-inflight? [client]
|
|
|
+ (true? (some-> client :stale-inflight? deref)))
|
|
|
+
|
|
|
+(defn- set-stale-inflight! [client active?]
|
|
|
+ (when-let [stale* (:stale-inflight? client)]
|
|
|
+ (reset! stale* (true? active?)))
|
|
|
+ active?)
|
|
|
+
|
|
|
(def ^:private invalid-coerce ::invalid-coerce)
|
|
|
|
|
|
(defn- coerce
|
|
|
@@ -116,6 +161,10 @@
|
|
|
(log/error tag data)
|
|
|
(throw (ex-info (name tag) data)))
|
|
|
|
|
|
+(defn- fail-fast-state! [client tag data]
|
|
|
+ (set-client-state! client state-end)
|
|
|
+ (fail-fast tag data))
|
|
|
+
|
|
|
(defn- require-number [value context]
|
|
|
(when-not (number? value)
|
|
|
(fail-fast :db-sync/invalid-field (assoc context :value value))))
|
|
|
@@ -165,6 +214,15 @@
|
|
|
(clear-reconnect-timer! reconnect)
|
|
|
(swap! reconnect assoc :attempt 0)))
|
|
|
|
|
|
+(defn- reset-client-machine! [client]
|
|
|
+ (set-client-state! client state-start)
|
|
|
+ (set-pull-active! client false)
|
|
|
+ (clear-tx-return-state! client)
|
|
|
+ (set-stale-inflight! client false)
|
|
|
+ (when-let [inflight (:inflight client)]
|
|
|
+ (reset! inflight []))
|
|
|
+ client)
|
|
|
+
|
|
|
(defn- send! [ws message]
|
|
|
(when (ws-open? ws)
|
|
|
(if-let [coerced (coerce-ws-client-message message)]
|
|
|
@@ -367,21 +425,31 @@
|
|
|
[:db.fn/retractEntity [:db-sync/tx-id tx-id]])
|
|
|
tx-ids)))))
|
|
|
|
|
|
+(defn- send-tx-batch! [repo client tx-ids txs]
|
|
|
+ (when-let [ws (:ws client)]
|
|
|
+ (when (and (ws-open? ws) (seq txs))
|
|
|
+ (reset! (:inflight client) tx-ids)
|
|
|
+ (set-tx-return-state! client (if (= (client-state client) state-pull-wait)
|
|
|
+ state-pull-wait
|
|
|
+ state-hello-done))
|
|
|
+ (set-client-state! client state-tx-batch-wait)
|
|
|
+ (send! ws {:type "tx/batch"
|
|
|
+ :t_before (or (client-op/get-local-tx repo) 0)
|
|
|
+ :txs txs}))))
|
|
|
+
|
|
|
(defn- flush-pending!
|
|
|
[repo client]
|
|
|
- (let [inflight @(:inflight client)]
|
|
|
- (when (empty? inflight)
|
|
|
- (when-let [ws (:ws client)]
|
|
|
- (when (ws-open? ws)
|
|
|
- (let [batch (pending-txs repo 50)]
|
|
|
- (when (seq batch)
|
|
|
- (let [tx-ids (mapv :tx-id batch)
|
|
|
- txs (mapv :tx batch)]
|
|
|
- (when (seq txs)
|
|
|
- (reset! (:inflight client) tx-ids)
|
|
|
- (send! ws {:type "tx/batch"
|
|
|
- :t_before (or (client-op/get-local-tx repo) 0)
|
|
|
- :txs txs}))))))))))
|
|
|
+ (let [state (client-state client)]
|
|
|
+ (when (contains? #{state-hello-done state-pull-wait} state)
|
|
|
+ (let [inflight @(:inflight client)]
|
|
|
+ (when (empty? inflight)
|
|
|
+ (when-let [ws (:ws client)]
|
|
|
+ (when (ws-open? ws)
|
|
|
+ (let [batch (pending-txs repo 50)]
|
|
|
+ (when (seq batch)
|
|
|
+ (let [tx-ids (mapv :tx-id batch)
|
|
|
+ txs (mapv :tx batch)]
|
|
|
+ (send-tx-batch! repo client tx-ids txs)))))))))))
|
|
|
|
|
|
(defn- pending-txs-by-ids
|
|
|
[repo tx-ids]
|
|
|
@@ -443,6 +511,10 @@
|
|
|
:send-queue (atom (p/resolved nil))
|
|
|
:asset-queue (atom (p/resolved nil))
|
|
|
:inflight (atom [])
|
|
|
+ :state (atom state-start)
|
|
|
+ :pull-active? (atom false)
|
|
|
+ :tx-return (atom nil)
|
|
|
+ :stale-inflight? (atom false)
|
|
|
:reconnect (atom {:attempt 0 :timer nil})}]
|
|
|
(swap! worker-state/*db-sync-clients assoc repo client)
|
|
|
client)))
|
|
|
@@ -675,85 +747,198 @@
|
|
|
(log/error :db-sync/apply-remote-tx-failed {:error e})))
|
|
|
(fail-fast :db-sync/missing-db {:repo repo :op :apply-remote-tx})))
|
|
|
|
|
|
+(defn- send-pull! [repo client ws since]
|
|
|
+ (send! ws {:type "pull" :since since})
|
|
|
+ (set-pull-active! client true)
|
|
|
+ (set-client-state! client state-pull-wait))
|
|
|
+
|
|
|
+(defn- mark-pull-complete! [client]
|
|
|
+ (set-pull-active! client false)
|
|
|
+ (case (client-state client)
|
|
|
+ :pull-wait (set-client-state! client state-hello-done)
|
|
|
+ :tx-batch-wait (set-tx-return-state! client state-hello-done)
|
|
|
+ nil))
|
|
|
+
|
|
|
+(defn- complete-tx-batch! [client]
|
|
|
+ (let [next (tx-return-state client)]
|
|
|
+ (clear-tx-return-state! client)
|
|
|
+ (if (= next state-pull-wait)
|
|
|
+ (do
|
|
|
+ (set-client-state! client state-pull-wait)
|
|
|
+ (set-pull-active! client true))
|
|
|
+ (do
|
|
|
+ (set-client-state! client state-hello-done)
|
|
|
+ (set-pull-active! client false)))
|
|
|
+ next))
|
|
|
+
|
|
|
+(defn- handle-pull-ok! [repo client message]
|
|
|
+ (let [txs (:txs message)
|
|
|
+ remote-tx (:t message)
|
|
|
+ _ (require-non-negative remote-tx {:repo repo :type "pull/ok"})
|
|
|
+ _ (require-seq txs {:repo repo :type "pull/ok" :field :txs})
|
|
|
+ tx (mapcat (fn [data]
|
|
|
+ (parse-transit (:tx data) {:repo repo :type "pull/ok"}))
|
|
|
+ txs)]
|
|
|
+ (when tx
|
|
|
+ (apply-remote-tx! repo client tx)
|
|
|
+ (client-op/update-local-tx repo remote-tx)
|
|
|
+ (mark-pull-complete! client)
|
|
|
+ (if (stale-inflight? client)
|
|
|
+ (let [tx-ids @(:inflight client)
|
|
|
+ entries (pending-txs-by-ids repo tx-ids)
|
|
|
+ txs (mapv :tx entries)]
|
|
|
+ (set-stale-inflight! client false)
|
|
|
+ (cond
|
|
|
+ (empty? tx-ids)
|
|
|
+ (flush-pending! repo client)
|
|
|
+
|
|
|
+ (empty? txs)
|
|
|
+ (fail-fast :db-sync/missing-field
|
|
|
+ {:repo repo :type "tx/reject" :field :inflight})
|
|
|
+
|
|
|
+ :else
|
|
|
+ (send-tx-batch! repo client tx-ids txs)))
|
|
|
+ (flush-pending! repo client)))))
|
|
|
+
|
|
|
+(defn- handle-tx-batch-ok! [repo client message]
|
|
|
+ (let [remote-tx (:t message)]
|
|
|
+ (require-non-negative remote-tx {:repo repo :type "tx/batch/ok"})
|
|
|
+ (client-op/update-local-tx repo remote-tx)
|
|
|
+ (remove-pending-txs! repo @(:inflight client))
|
|
|
+ (reset! (:inflight client) [])
|
|
|
+ (set-stale-inflight! client false)
|
|
|
+ (complete-tx-batch! client)
|
|
|
+ (flush-pending! repo client)))
|
|
|
+
|
|
|
(defn- handle-message! [repo client raw]
|
|
|
(let [message (-> raw parse-message coerce-ws-server-message)]
|
|
|
(when-not (map? message)
|
|
|
- (fail-fast :db-sync/response-parse-failed {:repo repo :raw raw}))
|
|
|
- (let [local-tx (or (client-op/get-local-tx repo) 0)
|
|
|
- remote-tx (:t message)]
|
|
|
- (case (:type message)
|
|
|
- "hello" (do
|
|
|
- (require-non-negative remote-tx {:repo repo :type "hello"})
|
|
|
- (when (> remote-tx local-tx)
|
|
|
- (send! (:ws client) {:type "pull" :since local-tx}))
|
|
|
- (enqueue-asset-sync! repo client)
|
|
|
- (enqueue-asset-initial-download! repo client)
|
|
|
- (flush-pending! repo client))
|
|
|
- ;; Upload response
|
|
|
- "tx/batch/ok" (do
|
|
|
- (require-non-negative remote-tx {:repo repo :type "tx/batch/ok"})
|
|
|
- (client-op/update-local-tx repo remote-tx)
|
|
|
- (remove-pending-txs! repo @(:inflight client))
|
|
|
- (reset! (:inflight client) [])
|
|
|
- (flush-pending! repo client))
|
|
|
- ;; Download response
|
|
|
- ;; Merge batch txs to one tx, does it really work? We'll see
|
|
|
- "pull/ok" (let [txs (:txs message)
|
|
|
- _ (require-non-negative remote-tx {:repo repo :type "pull/ok"})
|
|
|
- _ (require-seq txs {:repo repo :type "pull/ok" :field :txs})
|
|
|
- tx (mapcat (fn [data]
|
|
|
- (parse-transit (:tx data) {:repo repo :type "pull/ok"}))
|
|
|
- txs)]
|
|
|
- (when tx
|
|
|
- (apply-remote-tx! repo client tx)
|
|
|
- (client-op/update-local-tx repo remote-tx)
|
|
|
- (flush-pending! repo client)))
|
|
|
- "changed" (do
|
|
|
- (require-non-negative remote-tx {:repo repo :type "changed"})
|
|
|
- (when (< local-tx remote-tx)
|
|
|
- (send! (:ws client) {:type "pull" :since local-tx})))
|
|
|
- "tx/reject" (let [reason (:reason message)]
|
|
|
- (when (nil? reason)
|
|
|
- (fail-fast :db-sync/missing-field
|
|
|
- {:repo repo :type "tx/reject" :field :reason}))
|
|
|
- (when (contains? message :t)
|
|
|
- (require-non-negative remote-tx {:repo repo :type "tx/reject"}))
|
|
|
- (case reason
|
|
|
- "stale"
|
|
|
- (send! (:ws client) {:type "pull" :since local-tx})
|
|
|
- "cycle"
|
|
|
- (do
|
|
|
- (when (nil? (:data message))
|
|
|
- (fail-fast :db-sync/missing-field
|
|
|
- {:repo repo :type "tx/reject" :field :data}))
|
|
|
- (let [{:keys [attr server_values]}
|
|
|
- (parse-transit (:data message) {:repo repo :type "tx/reject"})]
|
|
|
- (when (nil? attr)
|
|
|
- (fail-fast :db-sync/missing-field
|
|
|
- {:repo repo :type "tx/reject" :field :attr}))
|
|
|
- (when (nil? server_values)
|
|
|
- (fail-fast :db-sync/missing-field
|
|
|
- {:repo repo :type "tx/reject" :field :server_values}))
|
|
|
- ;; FIXME: fix cycle shouldn't re-trigger uploading
|
|
|
- (let [inflight-ids @(:inflight client)
|
|
|
- inflight-entries (pending-txs-by-ids repo inflight-ids)]
|
|
|
- (log/info :db-sync/tx-reject-cycle
|
|
|
- {:repo repo
|
|
|
- :attr attr
|
|
|
- :server-values (count server_values)
|
|
|
- :entity-titles (cycle-entity-titles repo server_values)
|
|
|
- :inflight-ids (count inflight-ids)
|
|
|
- :local-tx local-tx
|
|
|
- :remote-tx remote-tx})
|
|
|
- (reconcile-cycle! repo attr server_values)
|
|
|
- (remove-pending-txs! repo inflight-ids)
|
|
|
- (reset! (:inflight client) [])
|
|
|
- (requeue-non-parent-txs! repo attr server_values inflight-entries))
|
|
|
- (flush-pending! repo client)))
|
|
|
- (fail-fast :db-sync/invalid-field
|
|
|
- {:repo repo :type "tx/reject" :reason reason})))
|
|
|
- (fail-fast :db-sync/invalid-field
|
|
|
- {:repo repo :type (:type message)})))))
|
|
|
+ (fail-fast-state! client :db-sync/response-parse-failed {:repo repo :raw raw}))
|
|
|
+ (let [state (client-state client)
|
|
|
+ local-tx (or (client-op/get-local-tx repo) 0)
|
|
|
+ remote-tx (:t message)
|
|
|
+ msg-type (:type message)
|
|
|
+ ws (:ws client)]
|
|
|
+ (case state
|
|
|
+ :start
|
|
|
+ (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :state state :type msg-type})
|
|
|
+
|
|
|
+ :hello-wait
|
|
|
+ (case msg-type
|
|
|
+ "hello" (do
|
|
|
+ (require-non-negative remote-tx {:repo repo :type "hello"})
|
|
|
+ (set-client-state! client state-hello-done)
|
|
|
+ (when (> remote-tx local-tx)
|
|
|
+ (send-pull! repo client ws local-tx))
|
|
|
+ (enqueue-asset-sync! repo client)
|
|
|
+ (enqueue-asset-initial-download! repo client)
|
|
|
+ (flush-pending! repo client))
|
|
|
+ (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :state state :type msg-type}))
|
|
|
+
|
|
|
+ :hello-done
|
|
|
+ (case msg-type
|
|
|
+ "changed" (do
|
|
|
+ (require-non-negative remote-tx {:repo repo :type "changed"})
|
|
|
+ (when (< local-tx remote-tx)
|
|
|
+ (send-pull! repo client ws local-tx)))
|
|
|
+ "error" (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :state state :type msg-type :message message})
|
|
|
+ "pong" nil
|
|
|
+ (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :state state :type msg-type}))
|
|
|
+
|
|
|
+ :pull-wait
|
|
|
+ (case msg-type
|
|
|
+ "pull/ok" (handle-pull-ok! repo client message)
|
|
|
+ "changed" (do
|
|
|
+ (require-non-negative remote-tx {:repo repo :type "changed"})
|
|
|
+ (when (< local-tx remote-tx)
|
|
|
+ (send-pull! repo client ws local-tx)))
|
|
|
+ "error" (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :state state :type msg-type :message message})
|
|
|
+ "pong" nil
|
|
|
+ (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :state state :type msg-type}))
|
|
|
+
|
|
|
+ :tx-batch-wait
|
|
|
+ (case msg-type
|
|
|
+ "tx/batch/ok" (handle-tx-batch-ok! repo client message)
|
|
|
+ "tx/reject" (let [reason (:reason message)]
|
|
|
+ (when (nil? reason)
|
|
|
+ (fail-fast-state! client :db-sync/missing-field
|
|
|
+ {:repo repo :type "tx/reject" :field :reason}))
|
|
|
+ (when (contains? message :t)
|
|
|
+ (require-non-negative remote-tx {:repo repo :type "tx/reject"}))
|
|
|
+ (case reason
|
|
|
+ "stale"
|
|
|
+ (do
|
|
|
+ (set-client-state! client state-tx-reject-stale)
|
|
|
+ (clear-tx-return-state! client)
|
|
|
+ (set-stale-inflight! client true)
|
|
|
+ (send-pull! repo client ws local-tx))
|
|
|
+ "cycle"
|
|
|
+ (do
|
|
|
+ (set-client-state! client state-tx-reject-cycle)
|
|
|
+ (set-stale-inflight! client false)
|
|
|
+ (when (nil? (:data message))
|
|
|
+ (fail-fast-state! client :db-sync/missing-field
|
|
|
+ {:repo repo :type "tx/reject" :field :data}))
|
|
|
+ (let [{:keys [attr server_values]}
|
|
|
+ (parse-transit (:data message) {:repo repo :type "tx/reject"})]
|
|
|
+ (when (nil? attr)
|
|
|
+ (fail-fast-state! client :db-sync/missing-field
|
|
|
+ {:repo repo :type "tx/reject" :field :attr}))
|
|
|
+ (when (nil? server_values)
|
|
|
+ (fail-fast-state! client :db-sync/missing-field
|
|
|
+ {:repo repo :type "tx/reject" :field :server_values}))
|
|
|
+ ;; FIXME: fix cycle shouldn't re-trigger uploading
|
|
|
+ (let [inflight-ids @(:inflight client)
|
|
|
+ inflight-entries (pending-txs-by-ids repo inflight-ids)]
|
|
|
+ (log/info :db-sync/tx-reject-cycle
|
|
|
+ {:repo repo
|
|
|
+ :attr attr
|
|
|
+ :server-values (count server_values)
|
|
|
+ :entity-titles (cycle-entity-titles repo server_values)
|
|
|
+ :inflight-ids (count inflight-ids)
|
|
|
+ :local-tx local-tx
|
|
|
+ :remote-tx remote-tx})
|
|
|
+ (reconcile-cycle! repo attr server_values)
|
|
|
+ (remove-pending-txs! repo inflight-ids)
|
|
|
+ (reset! (:inflight client) [])
|
|
|
+ (requeue-non-parent-txs! repo attr server_values inflight-entries))
|
|
|
+ (set-stale-inflight! client false)
|
|
|
+ (clear-tx-return-state! client)
|
|
|
+ (if (pull-active? client)
|
|
|
+ (set-client-state! client state-pull-wait)
|
|
|
+ (set-client-state! client state-hello-done))
|
|
|
+ (flush-pending! repo client)))
|
|
|
+ (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :type "tx/reject" :reason reason})))
|
|
|
+ "pull/ok" (if (pull-active? client)
|
|
|
+ (handle-pull-ok! repo client message)
|
|
|
+ (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :state state :type msg-type}))
|
|
|
+ "error" (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :state state :type msg-type :message message})
|
|
|
+ (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :state state :type msg-type}))
|
|
|
+
|
|
|
+ :tx-reject-stale
|
|
|
+ (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :state state :type msg-type})
|
|
|
+
|
|
|
+ :tx-reject-cycle
|
|
|
+ (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :state state :type msg-type})
|
|
|
+
|
|
|
+ :end
|
|
|
+ (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :state state :type msg-type})
|
|
|
+
|
|
|
+ (fail-fast-state! client :db-sync/invalid-field
|
|
|
+ {:repo repo :state state :type msg-type})))))
|
|
|
|
|
|
(declare connect!)
|
|
|
|
|
|
@@ -786,12 +971,17 @@
|
|
|
(set! (.-onclose ws)
|
|
|
(fn [_]
|
|
|
(log/info :db-sync/ws-closed {:repo repo})
|
|
|
+ (set-client-state! client state-end)
|
|
|
(schedule-reconnect! repo client url :close))))
|
|
|
|
|
|
(defn- start-pull-loop! [client _ws]
|
|
|
client)
|
|
|
|
|
|
(defn- stop-client! [client]
|
|
|
+ (set-client-state! client state-end)
|
|
|
+ (set-pull-active! client false)
|
|
|
+ (clear-tx-return-state! client)
|
|
|
+ (set-stale-inflight! client false)
|
|
|
(when-let [reconnect (:reconnect client)]
|
|
|
(clear-reconnect-timer! reconnect))
|
|
|
(when-let [ws (:ws client)]
|
|
|
@@ -802,11 +992,14 @@
|
|
|
|
|
|
(defn- connect! [repo client url]
|
|
|
(let [ws (js/WebSocket. (append-token url (auth-token)))
|
|
|
- updated (assoc client :ws ws)]
|
|
|
+ updated (-> client
|
|
|
+ reset-client-machine!
|
|
|
+ (assoc :ws ws))]
|
|
|
(attach-ws-handlers! repo updated ws url)
|
|
|
(set! (.-onopen ws)
|
|
|
(fn [_]
|
|
|
(reset-reconnect! updated)
|
|
|
+ (set-client-state! updated state-hello-wait)
|
|
|
(send! ws {:type "hello" :client repo})
|
|
|
(enqueue-asset-sync! repo updated)
|
|
|
(enqueue-asset-initial-download! repo updated)))
|