|
@@ -15,7 +15,8 @@
|
|
|
[frontend.worker.rtc.ws-util :as ws-util]
|
|
|
[logseq.db :as ldb]
|
|
|
[logseq.db.frontend.schema :as db-schema]
|
|
|
- [missionary.core :as m]))
|
|
|
+ [missionary.core :as m]
|
|
|
+ [tick.core :as tick]))
|
|
|
|
|
|
(defn- new-task--register-graph-updates
|
|
|
[get-ws-create-task graph-uuid major-schema-version repo]
|
|
@@ -34,61 +35,78 @@
|
|
|
(throw (ex-info "remote graph is still creating" {:missionary/retry true} e))
|
|
|
(throw e))))))
|
|
|
|
|
|
-(defn- ensure-register-graph-updates*
|
|
|
+(def ^:private *register-graph-updates-sent
|
|
|
+ "ws -> [bool, added-inst, [graph-uuid,major-schema-version,repo]]"
|
|
|
+ (atom {}))
|
|
|
+
|
|
|
+(defn- clean-old-keys-in-sent!
|
|
|
+ []
|
|
|
+ (let [hours-ago (tick/<< (tick/instant) (tick/new-duration 3 :hours))
|
|
|
+ old-ks
|
|
|
+ (keep (fn [[k [_ added-inst]]]
|
|
|
+ (when (tick/< added-inst hours-ago)
|
|
|
+ k))
|
|
|
+ @*register-graph-updates-sent)]
|
|
|
+ (doseq [k old-ks]
|
|
|
+ (swap! *register-graph-updates-sent dissoc k))))
|
|
|
+
|
|
|
+(defn ensure-register-graph-updates--memoized
|
|
|
"Return a task: get or create a mws(missionary wrapped websocket).
|
|
|
see also `ws/get-mws-create`.
|
|
|
But ensure `register-graph-updates` and `calibrate-graph-skeleton` has been sent"
|
|
|
[get-ws-create-task graph-uuid major-schema-version repo conn
|
|
|
*last-calibrate-t *online-users *server-schema-version add-log-fn]
|
|
|
- (assert (some? graph-uuid))
|
|
|
- (let [*sent (atom {}) ;; ws->bool
|
|
|
- ]
|
|
|
- (m/sp
|
|
|
- (let [ws (m/? get-ws-create-task)]
|
|
|
- (when-not (contains? @*sent ws)
|
|
|
- (swap! *sent assoc ws false))
|
|
|
- (when (not (@*sent ws))
|
|
|
- (let [recv-flow (ws/recv-flow (m/? get-ws-create-task))]
|
|
|
- (c.m/run-task :update-online-user-when-register-graph-updates
|
|
|
- (m/sp
|
|
|
- (when-let [online-users (:online-users
|
|
|
- (m/?
|
|
|
- (m/timeout
|
|
|
- (m/reduce
|
|
|
- (fn [_ v]
|
|
|
- (when (= "online-users-updated" (:req-id v))
|
|
|
- (reduced v)))
|
|
|
- recv-flow)
|
|
|
- 10000)))]
|
|
|
- (reset! *online-users online-users)))
|
|
|
- :succ (constantly nil)))
|
|
|
- (let [{:keys [max-remote-schema-version]}
|
|
|
+ (m/sp
|
|
|
+ (let [ws (m/? get-ws-create-task)
|
|
|
+ sent-3rd-value [graph-uuid major-schema-version repo]
|
|
|
+ origin-v (@*register-graph-updates-sent ws)]
|
|
|
+ (when (or (nil? origin-v)
|
|
|
+ (not= (last origin-v) sent-3rd-value))
|
|
|
+ (swap! *register-graph-updates-sent assoc ws [false (tick/instant) sent-3rd-value])
|
|
|
+ (clean-old-keys-in-sent!))
|
|
|
+ (when (not (first (@*register-graph-updates-sent ws)))
|
|
|
+ (swap! *register-graph-updates-sent assoc-in [ws 0] true)
|
|
|
+ (let [recv-flow (ws/recv-flow (m/? get-ws-create-task))]
|
|
|
+ (c.m/run-task :update-online-user-when-register-graph-updates
|
|
|
+ (m/sp
|
|
|
+ (when-let [online-users (:online-users
|
|
|
+ (m/?
|
|
|
+ (m/timeout
|
|
|
+ (m/reduce
|
|
|
+ (fn [_ v]
|
|
|
+ (when (= "online-users-updated" (:req-id v))
|
|
|
+ (reduced v)))
|
|
|
+ recv-flow)
|
|
|
+ 10000)))]
|
|
|
+ (reset! *online-users online-users)))
|
|
|
+ :succ (constantly nil)))
|
|
|
+ (let [{:keys [max-remote-schema-version]}
|
|
|
+ (try
|
|
|
(m/?
|
|
|
(c.m/backoff
|
|
|
- {:delay-seq
|
|
|
- ;retry 5 times if remote-graph is creating (4000 8000 16000 32000 64000)
|
|
|
+ {:delay-seq ;retry 5 times if remote-graph is creating (4000 8000 16000 32000 64000)
|
|
|
(take 5 (drop 2 c.m/delays))
|
|
|
:reset-flow worker-flows/online-event-flow}
|
|
|
- (new-task--register-graph-updates get-ws-create-task graph-uuid major-schema-version repo)))]
|
|
|
- (when max-remote-schema-version
|
|
|
- (add-log-fn :rtc.log/higher-remote-schema-version-exists
|
|
|
- {:sub-type (r.branch-graph/compare-schemas
|
|
|
- max-remote-schema-version db-schema/version major-schema-version)
|
|
|
- :repo repo
|
|
|
- :graph-uuid graph-uuid
|
|
|
- :remote-schema-version max-remote-schema-version})))
|
|
|
- (let [t (client-op/get-local-tx repo)]
|
|
|
- (when (or (nil? @*last-calibrate-t)
|
|
|
- (< 500 (- t @*last-calibrate-t)))
|
|
|
- (let [{:keys [server-schema-version _server-builtin-db-idents]}
|
|
|
- (m/? (r.skeleton/new-task--calibrate-graph-skeleton
|
|
|
- get-ws-create-task graph-uuid major-schema-version @conn))]
|
|
|
- (reset! *server-schema-version server-schema-version))
|
|
|
- (reset! *last-calibrate-t t)))
|
|
|
- (swap! *sent assoc ws true))
|
|
|
- ws))))
|
|
|
-
|
|
|
-(def ensure-register-graph-updates (memoize ensure-register-graph-updates*))
|
|
|
+ (new-task--register-graph-updates get-ws-create-task graph-uuid major-schema-version repo)))
|
|
|
+ (catch :default e
|
|
|
+ (swap! *register-graph-updates-sent assoc-in [ws 0] false)
|
|
|
+ (throw e)))]
|
|
|
+ (when max-remote-schema-version
|
|
|
+ (add-log-fn :rtc.log/higher-remote-schema-version-exists
|
|
|
+ {:sub-type (r.branch-graph/compare-schemas
|
|
|
+ max-remote-schema-version db-schema/version major-schema-version)
|
|
|
+ :repo repo
|
|
|
+ :graph-uuid graph-uuid
|
|
|
+ :remote-schema-version max-remote-schema-version})))
|
|
|
+ (let [t (client-op/get-local-tx repo)]
|
|
|
+ (when (or (nil? @*last-calibrate-t)
|
|
|
+ (< 500 (- t @*last-calibrate-t)))
|
|
|
+ (let [{:keys [server-schema-version _server-builtin-db-idents]}
|
|
|
+ (m/? (r.skeleton/new-task--calibrate-graph-skeleton
|
|
|
+ get-ws-create-task graph-uuid major-schema-version @conn))]
|
|
|
+ (reset! *server-schema-version server-schema-version))
|
|
|
+ (reset! *last-calibrate-t t))))
|
|
|
+ ws)))
|
|
|
|
|
|
(defn- ->pos
|
|
|
[parent-uuid order]
|