Tienson Qin 1 месяц назад
Родитель
Сommit
d181b1bfa9

+ 8 - 3
deps/db-sync/src/logseq/db_sync/index.cljs

@@ -123,8 +123,11 @@
 
 (defn <graph-members-list [db graph-id]
   (p/let [result (common/<d1-all db
-                                 (str "select user_id, graph_id, role, invited_by, created_at "
-                                      "from graph_members where graph_id = ? order by created_at asc")
+                                 (str "select m.user_id, m.graph_id, m.role, m.invited_by, m.created_at, "
+                                      "u.email, u.username "
+                                      "from graph_members m "
+                                      "left join users u on m.user_id = u.id "
+                                      "where m.graph_id = ? order by m.created_at asc")
                                  graph-id)
           rows (common/get-sql-rows result)]
     (mapv (fn [row]
@@ -132,7 +135,9 @@
              :graph_id (aget row "graph_id")
              :role (aget row "role")
              :invited_by (aget row "invited_by")
-             :created_at (aget row "created_at")})
+             :created_at (aget row "created_at")
+             :email (aget row "email")
+             :username (aget row "username")})
           rows)))
 
 (defn <graph-member-update-role! [db graph-id user-id role]

+ 16 - 1
deps/db-sync/src/logseq/db_sync/malli_schema.cljs

@@ -36,6 +36,18 @@
    [:t {:optional true} :int]
    [:data {:optional true} :string]])
 
+(def user-presence-schema
+  [:map
+   [:user_id :string]
+   [:email {:optional true} [:maybe :string]]
+   [:username {:optional true} [:maybe :string]]
+   [:name {:optional true} [:maybe :string]]])
+
+(def online-users-schema
+  [:map
+   [:type [:= "online-users"]]
+   [:online-users [:sequential user-presence-schema]]])
+
 (def pull-ok-schema
   [:map
    [:type [:= "pull/ok"]]
@@ -53,6 +65,7 @@
     [:map
      [:type [:= "hello"]]
      [:t :int]]]
+   ["online-users" online-users-schema]
    ["pull/ok" pull-ok-schema]
    ["tx/batch/ok" tx-batch-ok-schema]
    ["changed"
@@ -93,7 +106,9 @@
    [:graph_id :string]
    [:role graph-member-role-schema]
    [:invited_by {:optional true} [:maybe :string]]
-   [:created_at :int]])
+   [:created_at :int]
+   [:email {:optional true} [:maybe :string]]
+   [:username {:optional true} [:maybe :string]]])
 
 (def graph-members-list-response-schema
   [:map

+ 65 - 19
deps/db-sync/src/logseq/db_sync/worker.cljs

@@ -91,6 +91,56 @@
       (when-not (= coerced invalid-coerce)
         coerced))))
 
+(defn- send! [ws msg]
+  (when (ws-open? ws)
+    (if-let [coerced (coerce-ws-server-message msg)]
+      (.send ws (protocol/encode-message coerced))
+      (do
+        (log/error :db-sync/ws-response-invalid {:message msg})
+        (.send ws (protocol/encode-message {:type "error" :message "server error"}))))))
+
+(defn- broadcast! [^js self sender msg]
+  (let [clients (.getWebSockets (.-state self))]
+    (doseq [ws clients]
+      (when (and (not= ws sender) (ws-open? ws))
+        (send! ws msg)))))
+
+(defn- claims->user
+  [claims]
+  (when claims
+    (let [user-id (aget claims "sub")
+          email (aget claims "email")
+          username (or (aget claims "preferred_username")
+                       (aget claims "cognito:username")
+                       (aget claims "username"))
+          name (aget claims "name")]
+      (when (string? user-id)
+        (cond-> {:user_id user-id}
+          (string? email) (assoc :email email)
+          (string? username) (assoc :username username)
+          (string? name) (assoc :name name))))))
+
+(defn- presence*
+  [^js self]
+  (or (.-presence self)
+      (set! (.-presence self) (atom {}))))
+
+(defn- online-users
+  [^js self]
+  (vec (distinct (vals @(presence* self)))))
+
+(defn- broadcast-online-users!
+  [^js self]
+  (broadcast! self nil {:type "online-users" :online-users (online-users self)}))
+
+(defn- add-presence!
+  [^js self ^js ws user]
+  (swap! (presence* self) assoc ws user))
+
+(defn- remove-presence!
+  [^js self ^js ws]
+  (swap! (presence* self) dissoc ws))
+
 (defn- fail-fast [tag data]
   (log/error tag data)
   (throw (ex-info (name tag) data)))
@@ -102,14 +152,6 @@
         coerced))
     body))
 
-(defn- send! [ws msg]
-  (when (ws-open? ws)
-    (if-let [coerced (coerce-ws-server-message msg)]
-      (.send ws (protocol/encode-message coerced))
-      (do
-        (log/error :db-sync/ws-response-invalid {:message msg})
-        (.send ws (protocol/encode-message {:type "error" :message "server error"}))))))
-
 (defn- json-response
   ([schema-key data] (json-response schema-key data 200))
   ([schema-key data status]
@@ -135,12 +177,6 @@
 (defn- not-found []
   (error-response "not found" 404))
 
-(defn- broadcast! [^js self sender msg]
-  (let [clients (.getWebSockets (.-state self))]
-    (doseq [ws clients]
-      (when (and (not= ws sender) (ws-open? ws))
-        (send! ws msg)))))
-
 (defn- parse-int [value]
   (when (some? value)
     (let [n (js/parseInt value 10)]
@@ -382,9 +418,15 @@
 (defn- handle-ws [^js self request]
   (let [pair (js/WebSocketPair.)
         client (aget pair 0)
-        server (aget pair 1)]
-    (.acceptWebSocket (.-state self) server)
-    (js/Response. nil #js {:status 101 :webSocket client})))
+        server (aget pair 1)
+        env (.-env self)]
+    (p/let [claims (auth-claims request env)
+            user (claims->user claims)]
+      (.acceptWebSocket (.-state self) server)
+      (when user
+        (add-presence! self server user))
+      (broadcast-online-users! self)
+      (js/Response. nil #js {:status 101 :webSocket client}))))
 
 (defn- strip-sync-prefix [path]
   (if (string/starts-with? path "/sync/")
@@ -508,9 +550,13 @@
                         (log/error :db-sync/ws-error e)
                         (js/console.error e)
                         (send! ws {:type "error" :message "server error"}))))
-  (webSocketClose [_this _ws _code _reason]
+  (webSocketClose [this ws _code _reason]
+                  (remove-presence! this ws)
+                  (broadcast-online-users! this)
                   (log/info :db-sync/ws-closed true))
-  (webSocketError [_this _ws error]
+  (webSocketError [this ws error]
+                  (remove-presence! this ws)
+                  (broadcast-online-users! this)
                   (log/error :db-sync/ws-error {:error error})))
 
 (defn- index-db [^js self]

+ 4 - 0
docs/agent-guide/db-sync/protocol.md

@@ -19,6 +19,8 @@
 ## Server -> Client
 - `{"type":"hello","t":<t>}`
   - Server hello with current t.
+- `{"type":"online-users","online-users":[{"user_id":"...","email":"...","username":"...","name":"..."}...]}`
+  - Presence update with currently online users (fields may be omitted).
 - `{"type":"pull/ok","t":<t>,"txs":[{"t":<t>,"tx":"<tx-transit>"}...]}`
   - Pull response with txs.
 - `{"type":"tx/batch/ok","t":<t>}`
@@ -52,6 +54,8 @@
   - Create graph. Body: `{"graph_name":"...","schema_version":"<major>"}` (schema_version optional). Response: `{"graph_id":"..."}`.
 - `GET /graphs/:graph-id/access`
   - Access check. Response: `{"ok":true}`, `401` (unauthorized), `403` (forbidden), or `404` (not found).
+- `GET /graphs/:graph-id/members`
+  - Graph members list. Response: `{"members":[{user_id, graph_id, role, invited_by, created_at, email?, username?}...]}`.
 - `DELETE /graphs/:graph-id`
   - Delete graph and reset data. Response: `{"graph_id":"...","deleted":true}` or `400` (missing graph id).
 

+ 0 - 14
src/main/frontend/components/header.cljs

@@ -66,23 +66,9 @@
                    state)}
   [state]
   (let [rtc-graph-id (ldb/get-graph-rtc-uuid (db/get-db))
-        repo (state/get-current-repo)
         online-users @(::online-users state)]
     (when rtc-graph-id
       [:div.rtc-collaborators.flex.gap-1.text-sm.bg-gray-01.items-center
-       (when config/db-sync-enabled?
-         (shui/button-ghost-icon
-          :terminal
-          {:title "DB Sync state"
-           :on-click (fn [e]
-                       (p/let [client-state (state/<invoke-db-worker :thread-api/db-sync-state repo)]
-                         (let [state-label (if client-state (name client-state) "unknown")]
-                           (shui/popup-show!
-                            (.-target e)
-                            [:div.p-2.text-sm
-                             [:div.font-medium "DB Sync"]
-                             [:div.mt-1 (str "State: " state-label)]]
-                            {:align :start}))))}))
        (shui/button-ghost-icon :user-plus
                                {:on-click #(shui/dialog-open!
                                             (fn []

+ 19 - 1
src/main/frontend/handler/db_based/db_sync.cljs

@@ -104,7 +104,25 @@
 
 (defn <rtc-get-users-info
   []
-  (p/resolved nil))
+  (when-let [graph-uuid (ldb/get-graph-rtc-uuid (db/get-db))]
+    (let [base (http-base)
+          repo (state/get-current-repo)]
+      (if base
+        (p/let [_ (js/Promise. user-handler/task--ensure-id&access-token)
+                resp (fetch-json (str base "/graphs/" graph-uuid "/members")
+                                 {:method "GET"}
+                                 {:response-schema :graph-members/list})
+                members (:members resp)
+                users (mapv (fn [{:keys [user_id role email username]}]
+                              (let [name (or username email user_id)
+                                    user-type (some-> role keyword)]
+                                (cond-> {:user/uuid user_id
+                                         :user/name name
+                                         :graph<->user/user-type user-type}
+                                  (string? email) (assoc :user/email email))))
+                            members)]
+          (state/set-state! :rtc/users-info {repo users}))
+        (p/resolved nil)))))
 
 (defn <rtc-create-graph!
   [repo]

+ 48 - 9
src/main/frontend/worker/db_sync.cljs

@@ -5,6 +5,7 @@
             [datascript.core :as d]
             [frontend.worker.handler.page :as worker-page]
             [frontend.worker.rtc.client-op :as client-op]
+            [frontend.worker.shared-service :as shared-service]
             [frontend.worker.state :as worker-state]
             [lambdaisland.glogi :as log]
             [logseq.common.path :as path]
@@ -21,6 +22,40 @@
 
 (defonce *repo->latest-remote-tx (atom {}))
 
+(defn- normalize-online-users
+  [users]
+  (->> users
+       (keep (fn [{:keys [user_id email username name]}]
+               (when (string? user_id)
+                 (let [display-name (or username name user_id)]
+                   (cond-> {:user/uuid user_id
+                            :user/name display-name}
+                     (string? email) (assoc :user/email email))))))
+       (vec)))
+
+(defn- broadcast-rtc-state!
+  [client]
+  (when client
+    (let [ws-state @(:ws-state client)
+          online-users @(:online-users client)]
+      (shared-service/broadcast-to-clients!
+       :rtc-sync-state
+       {:rtc-state {:ws-state ws-state}
+        :rtc-lock (= :open ws-state)
+        :online-users (or online-users [])}))))
+
+(defn- set-ws-state!
+  [client ws-state]
+  (when-let [*ws-state (:ws-state client)]
+    (reset! *ws-state ws-state)
+    (broadcast-rtc-state! client)))
+
+(defn- update-online-users!
+  [client users]
+  (when-let [*online-users (:online-users client)]
+    (reset! *online-users (normalize-online-users users))
+    (broadcast-rtc-state! client)))
+
 (defn- enabled?
   []
   (true? (:enabled? @worker-state/*db-sync-config)))
@@ -341,7 +376,9 @@
                 :send-queue (atom (p/resolved nil))
                 :asset-queue (atom (p/resolved nil))
                 :inflight (atom [])
-                :reconnect (atom {:attempt 0 :timer nil})}]
+                :reconnect (atom {:attempt 0 :timer nil})
+                :online-users (atom [])
+                :ws-state (atom :closed)}]
     (reset! worker-state/*db-sync-client client)
     client))
 
@@ -709,6 +746,11 @@
                   (enqueue-asset-sync! repo client)
                   (enqueue-asset-initial-download! repo client)
                   (flush-pending! repo client))
+        "online-users" (let [users (:online-users message)]
+                         (when (and (some? users) (not (sequential? users)))
+                           (fail-fast :db-sync/invalid-field
+                                      {:repo repo :type "online-users" :field :online-users}))
+                         (update-online-users! client (or users [])))
         ;; Upload response
         "tx/batch/ok" (do
                         (require-non-negative remote-tx {:repo repo :type "tx/batch/ok"})
@@ -783,6 +825,8 @@
   (set! (.-onclose ws)
         (fn [_]
           (log/info :db-sync/ws-closed {:repo repo})
+          (update-online-users! client [])
+          (set-ws-state! client :closed)
           (schedule-reconnect! repo client url :close))))
 
 (defn- detach-ws-handlers! [ws]
@@ -799,6 +843,8 @@
     (clear-reconnect-timer! reconnect))
   (when-let [ws (:ws client)]
     (detach-ws-handlers! ws)
+    (update-online-users! client [])
+    (set-ws-state! client :closed)
     (try
       (.close ws)
       (catch :default _
@@ -813,6 +859,7 @@
     (set! (.-onopen ws)
           (fn [_]
             (reset-reconnect! updated)
+            (set-ws-state! updated :open)
             (send! ws {:type "hello" :client repo})
             (enqueue-asset-sync! repo updated)
             (enqueue-asset-initial-download! repo updated)))
@@ -930,11 +977,3 @@
                       (p/recur max-addr false))))))))
         (p/rejected (ex-info "db-sync missing sqlite db"
                              {:repo repo :graph-id graph-id}))))))
-
-(defn get-client-state
-  [repo]
-  (if-let [client @worker-state/*db-sync-client]
-    (if (= repo (:repo client))
-      (ws-open? (:ws client))
-      :not-started)
-    :not-started))

+ 1 - 5
src/main/frontend/worker/db_worker.cljs

@@ -16,6 +16,7 @@
             [frontend.worker-common.util :as worker-util]
             [frontend.worker.db-listener :as db-listener]
             [frontend.worker.db-metadata :as worker-db-metadata]
+            [frontend.worker.db-sync :as db-sync]
             [frontend.worker.db.fix :as db-fix]
             [frontend.worker.db.migrate :as db-migrate]
             [frontend.worker.db.validate :as worker-db-validate]
@@ -34,7 +35,6 @@
             [frontend.worker.shared-service :as shared-service]
             [frontend.worker.state :as worker-state]
             [frontend.worker.thread-atom]
-            [frontend.worker.db-sync :as db-sync]
             [goog.object :as gobj]
             [lambdaisland.glogi :as log]
             [lambdaisland.glogi.console :as glogi-console]
@@ -429,10 +429,6 @@
   [repo]
   (db-sync/upload-graph! repo))
 
-(def-thread-api :thread-api/db-sync-state
-  [repo]
-  (db-sync/get-client-state repo))
-
 (def-thread-api :thread-api/set-infer-worker-proxy
   [infer-worker-proxy]
   (reset! worker-state/*infer-worker infer-worker-proxy)