|
|
@@ -64,26 +64,48 @@
|
|
|
(string? v) (uuid v)
|
|
|
:else (throw (ex-info "illegal value" {:data v}))))
|
|
|
|
|
|
-(defn create-local-t-flow
|
|
|
+(defn- create-local-t-flow
|
|
|
[graph-uuid]
|
|
|
(->> (m/watch *graph-uuid->local-t)
|
|
|
(m/eduction (keep (fn [m] (get m (ensure-uuid graph-uuid)))))
|
|
|
c.m/continue-flow))
|
|
|
|
|
|
-(defn create-remote-t-flow
|
|
|
+(defn- create-remote-t-flow
|
|
|
[graph-uuid]
|
|
|
- {:pre [(some? graph-uuid)]}
|
|
|
(->> (m/watch *graph-uuid->remote-t)
|
|
|
(m/eduction (keep (fn [m] (get m (ensure-uuid graph-uuid)))))
|
|
|
c.m/continue-flow))
|
|
|
|
|
|
+(defn create-local&remote-t-flow
|
|
|
+ "ensure local-t <= remote-t"
|
|
|
+ [graph-uuid]
|
|
|
+ (assert (some? graph-uuid))
|
|
|
+ (->> (m/latest vector (create-local-t-flow graph-uuid) (create-remote-t-flow graph-uuid))
|
|
|
+ (m/eduction (filter (fn [[local-t remote-t]] (>= remote-t local-t))))))
|
|
|
+
|
|
|
(defn update-local-t
|
|
|
[graph-uuid local-t]
|
|
|
- (swap! *graph-uuid->local-t assoc (ensure-uuid graph-uuid) local-t))
|
|
|
+ (let [graph-uuid (ensure-uuid graph-uuid)
|
|
|
+ current-remote-t (get @*graph-uuid->remote-t graph-uuid)
|
|
|
+ current-local-t (get @*graph-uuid->local-t graph-uuid)]
|
|
|
+ (when (and current-remote-t current-local-t)
|
|
|
+ (assert (and (>= local-t current-local-t) (<= local-t current-remote-t))
|
|
|
+ {:local-t local-t
|
|
|
+ :current-local-t current-local-t
|
|
|
+ :current-remote-t current-remote-t}))
|
|
|
+ (swap! *graph-uuid->local-t assoc graph-uuid local-t)))
|
|
|
|
|
|
(defn update-remote-t
|
|
|
[graph-uuid remote-t]
|
|
|
- (swap! *graph-uuid->remote-t assoc (ensure-uuid graph-uuid) remote-t))
|
|
|
+ (let [graph-uuid (ensure-uuid graph-uuid)
|
|
|
+ current-remote-t (get @*graph-uuid->remote-t graph-uuid)
|
|
|
+ current-local-t (get @*graph-uuid->local-t graph-uuid)]
|
|
|
+ (when (and current-remote-t current-local-t)
|
|
|
+ (assert (and (>= remote-t current-remote-t) (>= remote-t current-local-t))
|
|
|
+ {:remote-t remote-t
|
|
|
+ :current-local-t current-local-t
|
|
|
+ :current-remote-t current-remote-t}))
|
|
|
+ (swap! *graph-uuid->remote-t assoc graph-uuid remote-t)))
|
|
|
|
|
|
;;; subscribe-logs, push to frontend
|
|
|
;;; TODO: refactor by using c.m/run-background-task
|