|
|
@@ -22,6 +22,32 @@
|
|
|
|
|
|
(defonce *repo->latest-remote-tx (atom {}))
|
|
|
|
|
|
+(defn- current-client
|
|
|
+ [repo]
|
|
|
+ (let [client @worker-state/*db-sync-client]
|
|
|
+ (when (= repo (:repo client))
|
|
|
+ client)))
|
|
|
+
|
|
|
+(defn- client-ops-conn [repo]
|
|
|
+ (worker-state/get-client-ops-conn repo))
|
|
|
+
|
|
|
+(defn- sync-counts
|
|
|
+ [repo]
|
|
|
+ (let [pending-local (when-let [conn (client-ops-conn repo)]
|
|
|
+ (count (d/datoms @conn :avet :db-sync/created-at)))
|
|
|
+ pending-asset (client-op/get-unpushed-asset-ops-count repo)
|
|
|
+ local-tx (client-op/get-local-tx repo)
|
|
|
+ remote-tx (get @*repo->latest-remote-tx repo)
|
|
|
+ pending-server (when (and (number? local-tx) (number? remote-tx))
|
|
|
+ (max 0 (- remote-tx local-tx)))
|
|
|
+ graph-uuid (client-op/get-graph-uuid repo)]
|
|
|
+ {:pending-local pending-local
|
|
|
+ :pending-asset pending-asset
|
|
|
+ :pending-server pending-server
|
|
|
+ :local-tx local-tx
|
|
|
+ :remote-tx remote-tx
|
|
|
+ :graph-uuid graph-uuid}))
|
|
|
+
|
|
|
(defn- normalize-online-users
|
|
|
[users]
|
|
|
(->> users
|
|
|
@@ -36,13 +62,21 @@
|
|
|
(defn- broadcast-rtc-state!
|
|
|
[client]
|
|
|
(when client
|
|
|
- (let [ws-state @(:ws-state client)
|
|
|
- online-users @(:online-users client)]
|
|
|
+ (let [repo (:repo client)
|
|
|
+ ws-state @(:ws-state client)
|
|
|
+ online-users @(:online-users client)
|
|
|
+ {:keys [pending-local pending-asset pending-server local-tx remote-tx graph-uuid]} (sync-counts repo)]
|
|
|
(shared-service/broadcast-to-clients!
|
|
|
:rtc-sync-state
|
|
|
{:rtc-state {:ws-state ws-state}
|
|
|
:rtc-lock (= :open ws-state)
|
|
|
- :online-users (or online-users [])}))))
|
|
|
+ :online-users (or online-users [])
|
|
|
+ :unpushed-block-update-count (or pending-local 0)
|
|
|
+ :pending-asset-ops-count (or pending-asset 0)
|
|
|
+ :pending-server-ops-count (or pending-server 0)
|
|
|
+ :local-tx local-tx
|
|
|
+ :remote-tx remote-tx
|
|
|
+ :graph-uuid graph-uuid}))))
|
|
|
|
|
|
(defn- set-ws-state!
|
|
|
[client ws-state]
|
|
|
@@ -277,9 +311,6 @@
|
|
|
(:block/uuid ent)))))
|
|
|
(distinct)))
|
|
|
|
|
|
-(defn- client-ops-conn [repo]
|
|
|
- (worker-state/get-client-ops-conn repo))
|
|
|
-
|
|
|
(defn- persist-local-tx! [repo normalized-tx-data reversed-datoms _tx-meta]
|
|
|
(when-let [conn (client-ops-conn repo)]
|
|
|
(let [tx-id (random-uuid)
|
|
|
@@ -288,6 +319,8 @@
|
|
|
:db-sync/normalized-tx-data normalized-tx-data
|
|
|
:db-sync/reversed-tx-data reversed-datoms
|
|
|
:db-sync/created-at now}])
|
|
|
+ (when-let [client (current-client repo)]
|
|
|
+ (broadcast-rtc-state! client))
|
|
|
tx-id)))
|
|
|
|
|
|
(defn- pending-txs
|
|
|
@@ -313,7 +346,9 @@
|
|
|
(ldb/transact! conn
|
|
|
(mapv (fn [tx-id]
|
|
|
[:db/retractEntity [:db-sync/tx-id tx-id]])
|
|
|
- tx-ids)))))
|
|
|
+ tx-ids))
|
|
|
+ (when-let [client (current-client repo)]
|
|
|
+ (broadcast-rtc-state! client)))))
|
|
|
|
|
|
(defn get-lookup-id
|
|
|
[x]
|
|
|
@@ -448,6 +483,8 @@
|
|
|
:asset-uuid asset-uuid
|
|
|
:size size})
|
|
|
(client-op/remove-asset-op repo asset-uuid)
|
|
|
+ (when-let [client (current-client repo)]
|
|
|
+ (broadcast-rtc-state! client))
|
|
|
(p/resolved nil))
|
|
|
|
|
|
:else
|
|
|
@@ -459,11 +496,16 @@
|
|
|
[{:block/uuid asset-uuid
|
|
|
:logseq.property.asset/remote-metadata {:checksum checksum :type asset-type}}]
|
|
|
{:persist-op? false}))
|
|
|
- (client-op/remove-asset-op repo asset-uuid)))
|
|
|
+ (client-op/remove-asset-op repo asset-uuid)
|
|
|
+ (when-let [client (current-client repo)]
|
|
|
+ (broadcast-rtc-state! client))))
|
|
|
(p/catch (fn [e]
|
|
|
(case (:type (ex-data e))
|
|
|
:rtc.exception/read-asset-failed
|
|
|
- (client-op/remove-asset-op repo asset-uuid)
|
|
|
+ (do
|
|
|
+ (client-op/remove-asset-op repo asset-uuid)
|
|
|
+ (when-let [client (current-client repo)]
|
|
|
+ (broadcast-rtc-state! client)))
|
|
|
|
|
|
:rtc.exception/upload-asset-failed
|
|
|
nil
|
|
|
@@ -476,6 +518,9 @@
|
|
|
|
|
|
(contains? asset-op :remove-asset)
|
|
|
(-> (client-op/remove-asset-op repo asset-uuid)
|
|
|
+ (p/then (fn [_]
|
|
|
+ (when-let [client (current-client repo)]
|
|
|
+ (broadcast-rtc-state! client))))
|
|
|
(p/catch (fn [e]
|
|
|
(log/error :db-sync/asset-delete-failed
|
|
|
{:repo repo
|
|
|
@@ -741,6 +786,7 @@
|
|
|
(case (:type message)
|
|
|
"hello" (do
|
|
|
(require-non-negative remote-tx {:repo repo :type "hello"})
|
|
|
+ (broadcast-rtc-state! client)
|
|
|
(when (> remote-tx local-tx)
|
|
|
(send! (:ws client) {:type "pull" :since local-tx}))
|
|
|
(enqueue-asset-sync! repo client)
|
|
|
@@ -755,6 +801,7 @@
|
|
|
"tx/batch/ok" (do
|
|
|
(require-non-negative remote-tx {:repo repo :type "tx/batch/ok"})
|
|
|
(client-op/update-local-tx repo remote-tx)
|
|
|
+ (broadcast-rtc-state! client)
|
|
|
(remove-pending-txs! repo @(:inflight client))
|
|
|
(reset! (:inflight client) [])
|
|
|
(flush-pending! repo client))
|
|
|
@@ -773,9 +820,11 @@
|
|
|
:local-tx local-tx
|
|
|
:remote-tx remote-tx)
|
|
|
(client-op/update-local-tx repo remote-tx)
|
|
|
+ (broadcast-rtc-state! client)
|
|
|
(flush-pending! repo client))))
|
|
|
"changed" (do
|
|
|
(require-non-negative remote-tx {:repo repo :type "changed"})
|
|
|
+ (broadcast-rtc-state! client)
|
|
|
(when (< local-tx remote-tx)
|
|
|
(send! (:ws client) {:type "pull" :since local-tx})))
|
|
|
"tx/reject" (let [reason (:reason message)]
|