|  | @@ -64,26 +64,48 @@
 | 
	
		
			
				|  |  |      (string? v) (uuid v)
 | 
	
		
			
				|  |  |      :else       (throw (ex-info "illegal value" {:data v}))))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -(defn create-local-t-flow
 | 
	
		
			
				|  |  | +(defn- create-local-t-flow
 | 
	
		
			
				|  |  |    [graph-uuid]
 | 
	
		
			
				|  |  |    (->> (m/watch *graph-uuid->local-t)
 | 
	
		
			
				|  |  |         (m/eduction (keep (fn [m] (get m (ensure-uuid graph-uuid)))))
 | 
	
		
			
				|  |  |         c.m/continue-flow))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | -(defn create-remote-t-flow
 | 
	
		
			
				|  |  | +(defn- create-remote-t-flow
 | 
	
		
			
				|  |  |    [graph-uuid]
 | 
	
		
			
				|  |  | -  {:pre [(some? graph-uuid)]}
 | 
	
		
			
				|  |  |    (->> (m/watch *graph-uuid->remote-t)
 | 
	
		
			
				|  |  |         (m/eduction (keep (fn [m] (get m (ensure-uuid graph-uuid)))))
 | 
	
		
			
				|  |  |         c.m/continue-flow))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  | +(defn create-local&remote-t-flow
 | 
	
		
			
				|  |  | +  "ensure local-t <= remote-t"
 | 
	
		
			
				|  |  | +  [graph-uuid]
 | 
	
		
			
				|  |  | +  (assert (some? graph-uuid))
 | 
	
		
			
				|  |  | +  (->> (m/latest vector (create-local-t-flow graph-uuid) (create-remote-t-flow graph-uuid))
 | 
	
		
			
				|  |  | +       (m/eduction (filter (fn [[local-t remote-t]] (>= remote-t local-t))))))
 | 
	
		
			
				|  |  | +
 | 
	
		
			
				|  |  |  (defn update-local-t
 | 
	
		
			
				|  |  |    [graph-uuid local-t]
 | 
	
		
			
				|  |  | -  (swap! *graph-uuid->local-t assoc (ensure-uuid graph-uuid) local-t))
 | 
	
		
			
				|  |  | +  (let [graph-uuid (ensure-uuid graph-uuid)
 | 
	
		
			
				|  |  | +        current-remote-t (get @*graph-uuid->remote-t graph-uuid)
 | 
	
		
			
				|  |  | +        current-local-t (get @*graph-uuid->local-t graph-uuid)]
 | 
	
		
			
				|  |  | +    (when (and current-remote-t current-local-t)
 | 
	
		
			
				|  |  | +      (assert (and (>= local-t current-local-t) (<= local-t current-remote-t))
 | 
	
		
			
				|  |  | +              {:local-t local-t
 | 
	
		
			
				|  |  | +               :current-local-t current-local-t
 | 
	
		
			
				|  |  | +               :current-remote-t current-remote-t}))
 | 
	
		
			
				|  |  | +    (swap! *graph-uuid->local-t assoc graph-uuid local-t)))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  (defn update-remote-t
 | 
	
		
			
				|  |  |    [graph-uuid remote-t]
 | 
	
		
			
				|  |  | -  (swap! *graph-uuid->remote-t assoc (ensure-uuid graph-uuid) remote-t))
 | 
	
		
			
				|  |  | +  (let [graph-uuid (ensure-uuid graph-uuid)
 | 
	
		
			
				|  |  | +        current-remote-t (get @*graph-uuid->remote-t graph-uuid)
 | 
	
		
			
				|  |  | +        current-local-t (get @*graph-uuid->local-t graph-uuid)]
 | 
	
		
			
				|  |  | +    (when (and current-remote-t current-local-t)
 | 
	
		
			
				|  |  | +      (assert (and (>= remote-t current-remote-t) (>= remote-t current-local-t))
 | 
	
		
			
				|  |  | +              {:remote-t remote-t
 | 
	
		
			
				|  |  | +               :current-local-t current-local-t
 | 
	
		
			
				|  |  | +               :current-remote-t current-remote-t}))
 | 
	
		
			
				|  |  | +    (swap! *graph-uuid->remote-t assoc graph-uuid remote-t)))
 | 
	
		
			
				|  |  |  
 | 
	
		
			
				|  |  |  ;;; subscribe-logs, push to frontend
 | 
	
		
			
				|  |  |  ;;; TODO: refactor by using c.m/run-background-task
 |