|
|
@@ -1,6 +1,8 @@
|
|
|
(ns frontend.worker.rtc.core
|
|
|
"Main(use missionary) ns for rtc related fns"
|
|
|
- (:require [frontend.common.missionary-util :as c.m]
|
|
|
+ (:require [clojure.data :as data]
|
|
|
+ [datascript.core :as d]
|
|
|
+ [frontend.common.missionary-util :as c.m]
|
|
|
[frontend.worker.device :as worker-device]
|
|
|
[frontend.worker.rtc.asset :as r.asset]
|
|
|
[frontend.worker.rtc.client :as r.client]
|
|
|
@@ -80,14 +82,43 @@
|
|
|
(m/?< clock-flow)
|
|
|
(catch Cancelled _ (m/amb))))))))
|
|
|
|
|
|
+(defn create-inject-users-info-flow
|
|
|
+ "Return a flow: emit event if need to notify the server to inject users-info to graph."
|
|
|
+ [repo online-users-updated-flow]
|
|
|
+ (m/ap
|
|
|
+ (if-let [conn (worker-state/get-datascript-conn repo)]
|
|
|
+ (if-let [online-users (seq (m/?> online-users-updated-flow))]
|
|
|
+ (let [user-uuid->user (into {} (map (juxt :user/uuid identity) online-users))
|
|
|
+ user-blocks (keep (fn [user-uuid] (d/entity @conn [:block/uuid user-uuid])) (keys user-uuid->user))]
|
|
|
+ (if (or (not= (count user-blocks) (count user-uuid->user))
|
|
|
+ (some
|
|
|
+ ;; check if some attrs not equal among user-blocks and online-users
|
|
|
+ (fn [user-block]
|
|
|
+ (let [user (user-uuid->user (:block/uuid user-block))
|
|
|
+ [diff-r1 diff-r2]
|
|
|
+ (data/diff
|
|
|
+ (select-keys user-block [:logseq.user/name :logseq.user/email :logseq.user/avatar])
|
|
|
+ (update-keys
|
|
|
+ (select-keys user [:user/name :user/email :user/avatar])
|
|
|
+ (fn [k] (keyword "logseq.user" (name k)))))]
|
|
|
+ (or (some? diff-r1) (some? diff-r2))))
|
|
|
+ user-blocks))
|
|
|
+ (m/amb {:type :inject-users-info}
|
|
|
+ ;; then trigger a pull-remote-updates to update local-graph
|
|
|
+ {:type :pull-remote-updates :from :x})
|
|
|
+ (m/amb)))
|
|
|
+ (m/amb))
|
|
|
+ (m/amb))))
|
|
|
+
|
|
|
(defn- create-mixed-flow
|
|
|
"Return a flow that emits all kinds of events:
|
|
|
`:remote-update`: remote-updates data from server
|
|
|
`:remote-asset-update`: remote asset-updates from server
|
|
|
`:local-update-check`: event to notify to check if there're some new local-updates, then push to remote.
|
|
|
`:online-users-updated`: online users info updated
|
|
|
- `:pull-remote-updates`: pull remote updates"
|
|
|
- [repo get-ws-create-task *auto-push?]
|
|
|
+ `:pull-remote-updates`: pull remote updates
|
|
|
+ `:inject-users-info`: notify server to inject users-info into the graph"
|
|
|
+ [repo get-ws-create-task *auto-push? *online-users]
|
|
|
(let [remote-updates-flow (m/eduction
|
|
|
(map (fn [data]
|
|
|
(case (:req-id data)
|
|
|
@@ -98,7 +129,8 @@
|
|
|
local-updates-check-flow (m/eduction
|
|
|
(map (fn [data] {:type :local-update-check :value data}))
|
|
|
(create-local-updates-check-flow repo *auto-push? 2000))
|
|
|
- mix-flow (m/stream (c.m/mix remote-updates-flow local-updates-check-flow))]
|
|
|
+ inject-user-info-flow (create-inject-users-info-flow repo (m/watch *online-users))
|
|
|
+ mix-flow (c.m/mix remote-updates-flow local-updates-check-flow inject-user-info-flow)]
|
|
|
(c.m/mix mix-flow (create-pull-remote-updates-flow 60000 mix-flow))))
|
|
|
|
|
|
(defn- create-ws-state-flow
|
|
|
@@ -140,6 +172,7 @@
|
|
|
(finally
|
|
|
(reset! *rtc-lock nil)))))
|
|
|
|
|
|
+(declare new-task--inject-users-info)
|
|
|
(defn- create-rtc-loop
|
|
|
"Return a map with [:rtc-state-flow :rtc-loop-task :*rtc-auto-push? :onstarted-task]
|
|
|
TODO: auto refresh token if needed"
|
|
|
@@ -160,7 +193,7 @@
|
|
|
get-ws-create-task graph-uuid repo conn *last-calibrate-t *online-users)
|
|
|
{:keys [assets-sync-loop-task]}
|
|
|
(r.asset/create-assets-sync-loop repo get-ws-create-task graph-uuid conn *auto-push?)
|
|
|
- mixed-flow (create-mixed-flow repo get-ws-create-task *auto-push?)]
|
|
|
+ mixed-flow (create-mixed-flow repo get-ws-create-task *auto-push? *online-users)]
|
|
|
(assert (some? *current-ws))
|
|
|
{:rtc-state-flow (create-rtc-state-flow (create-ws-state-flow *current-ws))
|
|
|
:*rtc-auto-push? *auto-push?
|
|
|
@@ -199,7 +232,10 @@
|
|
|
|
|
|
:pull-remote-updates
|
|
|
(m/? (r.client/new-task--pull-remote-data
|
|
|
- repo conn graph-uuid date-formatter get-ws-create-task add-log-fn))))
|
|
|
+ repo conn graph-uuid date-formatter get-ws-create-task add-log-fn))
|
|
|
+
|
|
|
+ :inject-users-info
|
|
|
+ (m/? (new-task--inject-users-info token graph-uuid))))
|
|
|
(m/ap)
|
|
|
(m/reduce {} nil)
|
|
|
(m/?))
|
|
|
@@ -284,7 +320,7 @@
|
|
|
(when ex-data (prn ::delete-graph-failed graph-uuid ex-data))
|
|
|
(boolean (nil? ex-data))))))
|
|
|
|
|
|
-(defn new-task--get-user-info
|
|
|
+(defn new-task--get-users-info
|
|
|
"Return a task that return users-info about the graph."
|
|
|
[token graph-uuid]
|
|
|
(let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
|
|
|
@@ -292,6 +328,12 @@
|
|
|
(ws-util/send&recv get-ws-create-task
|
|
|
{:action "get-users-info" :graph-uuid graph-uuid}))))
|
|
|
|
|
|
+(defn new-task--inject-users-info
|
|
|
+ [token graph-uuid]
|
|
|
+ (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
|
|
|
+ (ws-util/send&recv get-ws-create-task
|
|
|
+ {:action "inject-users-info" :graph-uuid graph-uuid})))
|
|
|
+
|
|
|
(defn new-task--grant-access-to-others
|
|
|
[token graph-uuid & {:keys [target-user-uuids target-user-emails]}]
|
|
|
(let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
|
|
|
@@ -393,7 +435,6 @@
|
|
|
(fn [_ v] (worker-util/post-message :rtc-sync-state v))
|
|
|
create-get-state-flow))
|
|
|
|
|
|
-
|
|
|
(comment
|
|
|
(do
|
|
|
(def user-uuid "7f41990d-2c8f-4f79-b231-88e9f652e072")
|
|
|
@@ -426,4 +467,8 @@
|
|
|
((->> (m/sample vector
|
|
|
(m/latest identity (m/reductions {} 0 (sleep-emit [1000 1 2])))
|
|
|
(sleep-emit [2000 3000 1000]))
|
|
|
- (m/reduce (fn [_ v] (prn :v v)))) prn prn)))
|
|
|
+ (m/reduce (fn [_ v] (prn :v v)))) prn prn))
|
|
|
+
|
|
|
+ (let [f (m/stream (m/ap (m/amb 1 2 3 4)))]
|
|
|
+ ((m/reduce (fn [r v] (conj r v)) (m/reductions {} :xxx f)) prn prn)
|
|
|
+ ((m/reduce (fn [r v] (conj r v)) f) prn prn)))
|