core.cljs 18 KB


  1. (ns frontend.worker.rtc.core
  2. "Main(use missionary) ns for rtc related fns"
  3. (:require [frontend.common.missionary-util :as c.m]
  4. [frontend.worker.device :as worker-device]
  5. [frontend.worker.rtc.asset :as r.asset]
  6. [frontend.worker.rtc.client :as r.client]
  7. [frontend.worker.rtc.client-op :as client-op]
  8. [frontend.worker.rtc.exception :as r.ex]
  9. [frontend.worker.rtc.full-upload-download-graph :as r.upload-download]
  10. [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
  11. [frontend.worker.rtc.remote-update :as r.remote-update]
  12. [frontend.worker.rtc.skeleton]
  13. [frontend.worker.rtc.ws :as ws]
  14. [frontend.worker.rtc.ws-util :as ws-util :refer [gen-get-ws-create-map--memoized]]
  15. [frontend.worker.state :as worker-state]
  16. [frontend.worker.util :as worker-util]
  17. [logseq.common.config :as common-config]
  18. [logseq.db :as ldb]
  19. [malli.core :as ma]
  20. [missionary.core :as m])
  21. (:import [missionary Cancelled]))
  22. (def ^:private rtc-state-schema
  23. [:map
  24. [:ws-state {:optional true} [:enum :connecting :open :closing :closed]]])
  25. (def ^:private rtc-state-validator (ma/validator rtc-state-schema))
  26. (def ^:private sentinel (js-obj))
  27. (defn- get-remote-updates
  28. "Return a flow: receive messages from ws,
  29. and filter messages with :req-id=
  30. - `push-updates`
  31. - `online-users-updated`.
  32. - `push-asset-upload-updates`"
  33. [get-ws-create-task]
  34. (m/ap
  35. (loop []
  36. (let [ws (m/? get-ws-create-task)
  37. x (try
  38. (m/?> (m/eduction
  39. (filter (fn [data]
  40. (contains?
  41. #{"online-users-updated"
  42. "push-updates"
  43. "push-asset-upload-updates"}
  44. (:req-id data))))
  45. (ws/recv-flow ws)))
  46. (catch js/CloseEvent _
  47. sentinel))]
  48. (if (identical? x sentinel)
  49. (recur)
  50. x)))))
  51. (defn- create-local-updates-check-flow
  52. "Return a flow: emit if need to push local-updates"
  53. [repo *auto-push? interval-ms]
  54. (let [auto-push-flow (m/watch *auto-push?)
  55. clock-flow (c.m/clock interval-ms :clock)
  56. merge-flow (m/latest vector auto-push-flow clock-flow)]
  57. (m/eduction (filter first)
  58. (map second)
  59. (filter (fn [v] (when (pos? (client-op/get-unpushed-block-ops-count repo)) v)))
  60. merge-flow)))
  61. (defn- create-pull-remote-updates-flow
  62. "Return a flow: emit to pull remote-updates.
  63. reschedule next emit(INTERVAL-MS later) every time FLOW emit a value."
  64. [interval-ms flow]
  65. (let [v {:type :pull-remote-updates}
  66. clock-flow (m/ap
  67. (loop []
  68. (m/amb
  69. (m/? (m/sleep interval-ms v))
  70. (recur))))]
  71. (m/ap
  72. (m/amb
  73. v
  74. (let [_ (m/?< (->> flow
  75. (m/reductions {} nil)
  76. (m/latest identity)))]
  77. (try
  78. (m/?< clock-flow)
  79. (catch Cancelled _ (m/amb))))))))
  80. (defn- create-mixed-flow
  81. "Return a flow that emits all kinds of events:
  82. `:remote-update`: remote-updates data from server
  83. `:remote-asset-update`: remote asset-updates from server
  84. `:local-update-check`: event to notify to check if there're some new local-updates, then push to remote.
  85. `:online-users-updated`: online users info updated
  86. `:pull-remote-updates`: pull remote updates"
  87. [repo get-ws-create-task *auto-push?]
  88. (let [remote-updates-flow (m/eduction
  89. (map (fn [data]
  90. (case (:req-id data)
  91. "push-updates" {:type :remote-update :value data}
  92. "online-users-updated" {:type :online-users-updated :value data}
  93. "push-asset-upload-updates" {:type :remote-asset-update :value data})))
  94. (get-remote-updates get-ws-create-task))
  95. local-updates-check-flow (m/eduction
  96. (map (fn [data] {:type :local-update-check :value data}))
  97. (create-local-updates-check-flow repo *auto-push? 2000))
  98. mix-flow (m/stream (c.m/mix remote-updates-flow local-updates-check-flow))]
  99. (c.m/mix mix-flow (create-pull-remote-updates-flow 60000 mix-flow))))
  100. (defn- create-ws-state-flow
  101. [*current-ws]
  102. (m/relieve
  103. (m/ap
  104. (let [ws (m/?< (m/watch *current-ws))]
  105. (try
  106. (if ws
  107. (m/?< (ws/create-mws-state-flow ws))
  108. (m/amb))
  109. (catch Cancelled _
  110. (m/amb)))))))
  111. (defn- create-rtc-state-flow
  112. [ws-state-flow]
  113. (m/latest
  114. (fn [ws-state]
  115. {:post [(rtc-state-validator %)]}
  116. (cond-> {}
  117. ws-state (assoc :ws-state ws-state)))
  118. (m/reductions {} nil ws-state-flow)))
  119. (defonce ^:private *rtc-lock (atom nil))
  120. (defn- holding-rtc-lock
  121. "Use this fn to prevent multiple rtc-loops at same time.
  122. rtc-loop-task is stateless, but conn is not.
  123. we need to ensure that no two concurrent rtc-loop-tasks are modifying `conn` at the same time"
  124. [started-dfv task]
  125. (m/sp
  126. (when-not (compare-and-set! *rtc-lock nil true)
  127. (let [e (ex-info "Must not run multiple rtc-loops, try later"
  128. {:type :rtc.exception/lock-failed
  129. :missionary/retry true})]
  130. (started-dfv e)
  131. (throw e)))
  132. (try
  133. (m/? task)
  134. (finally
  135. (reset! *rtc-lock nil)))))
  136. (defn- create-rtc-loop
  137. "Return a map with [:rtc-state-flow :rtc-loop-task :*rtc-auto-push? :onstarted-task]
  138. TODO: auto refresh token if needed"
  139. [graph-uuid repo conn date-formatter token
  140. & {:keys [auto-push? debug-ws-url] :or {auto-push? true}}]
  141. (let [ws-url (or debug-ws-url (ws-util/get-ws-url token))
  142. *auto-push? (atom auto-push?)
  143. *last-calibrate-t (atom nil)
  144. *online-users (atom nil)
  145. *assets-sync-loop-canceler (atom nil)
  146. started-dfv (m/dfv)
  147. add-log-fn (fn [type message]
  148. (assert (map? message) message)
  149. (rtc-log-and-state/rtc-log type (assoc message :graph-uuid graph-uuid)))
  150. {:keys [*current-ws get-ws-create-task]}
  151. (gen-get-ws-create-map--memoized ws-url)
  152. get-ws-create-task (r.client/ensure-register-graph-updates
  153. get-ws-create-task graph-uuid repo conn *last-calibrate-t *online-users)
  154. {:keys [assets-sync-loop-task]}
  155. (r.asset/create-assets-sync-loop repo get-ws-create-task graph-uuid conn *auto-push?)
  156. mixed-flow (create-mixed-flow repo get-ws-create-task *auto-push?)]
  157. (assert (some? *current-ws))
  158. {:rtc-state-flow (create-rtc-state-flow (create-ws-state-flow *current-ws))
  159. :*rtc-auto-push? *auto-push?
  160. :*online-users *online-users
  161. :onstarted-task started-dfv
  162. :rtc-loop-task
  163. (holding-rtc-lock
  164. started-dfv
  165. (m/sp
  166. (try
  167. ;; init run to open a ws
  168. (m/? get-ws-create-task)
  169. (started-dfv true)
  170. (reset! *assets-sync-loop-canceler
  171. (c.m/run-task assets-sync-loop-task :assets-sync-loop-task))
  172. (->>
  173. (let [event (m/?> mixed-flow)]
  174. (case (:type event)
  175. :remote-update
  176. (try (r.remote-update/apply-remote-update graph-uuid repo conn date-formatter event add-log-fn)
  177. (catch :default e
  178. (when (= ::r.remote-update/need-pull-remote-data (:type (ex-data e)))
  179. (m/? (r.client/new-task--pull-remote-data
  180. repo conn graph-uuid date-formatter get-ws-create-task add-log-fn)))))
  181. :remote-asset-update
  182. (m/? (r.asset/new-task--emit-remote-asset-updates-from-push-asset-upload-updates
  183. repo @conn (:value event)))
  184. :local-update-check
  185. (m/? (r.client/new-task--push-local-ops
  186. repo conn graph-uuid date-formatter
  187. get-ws-create-task add-log-fn))
  188. :online-users-updated
  189. (reset! *online-users (:online-users (:value event)))
  190. :pull-remote-updates
  191. (m/? (r.client/new-task--pull-remote-data
  192. repo conn graph-uuid date-formatter get-ws-create-task add-log-fn))))
  193. (m/ap)
  194. (m/reduce {} nil)
  195. (m/?))
  196. (catch Cancelled e
  197. (add-log-fn :rtc.log/cancelled {})
  198. (throw e))
  199. (finally
  200. (when @*assets-sync-loop-canceler (@*assets-sync-loop-canceler))))))}))
  201. (def ^:private empty-rtc-loop-metadata
  202. {:graph-uuid nil
  203. :user-uuid nil
  204. :rtc-state-flow nil
  205. :*rtc-auto-push? nil
  206. :*online-users nil
  207. :*rtc-lock nil
  208. :canceler nil})
  209. (defonce ^:private *rtc-loop-metadata (atom empty-rtc-loop-metadata))
  210. ;;; ================ API ================
  211. (defn new-task--rtc-start
  212. [repo token]
  213. (m/sp
  214. ;; ensure device metadata existing first
  215. (m/? (worker-device/new-task--ensure-device-metadata! token))
  216. (if-let [conn (worker-state/get-datascript-conn repo)]
  217. (if-let [graph-uuid (ldb/get-graph-rtc-uuid @conn)]
  218. (let [user-uuid (:sub (worker-util/parse-jwt token))
  219. config (worker-state/get-config repo)
  220. date-formatter (common-config/get-date-formatter config)
  221. {:keys [rtc-state-flow *rtc-auto-push? rtc-loop-task *online-users onstarted-task]}
  222. (create-rtc-loop graph-uuid repo conn date-formatter token)
  223. canceler (c.m/run-task rtc-loop-task :rtc-loop-task)
  224. start-ex (m/? onstarted-task)]
  225. (if-let [start-ex (:ex-data start-ex)]
  226. (r.ex/->map start-ex)
  227. (do (reset! *rtc-loop-metadata {:repo repo
  228. :graph-uuid graph-uuid
  229. :user-uuid user-uuid
  230. :rtc-state-flow rtc-state-flow
  231. :*rtc-auto-push? *rtc-auto-push?
  232. :*online-users *online-users
  233. :*rtc-lock *rtc-lock
  234. :canceler canceler})
  235. nil)))
  236. (r.ex/->map r.ex/ex-local-not-rtc-graph))
  237. (r.ex/->map (ex-info "Not found db-conn" {:type :rtc.exception/not-found-db-conn
  238. :repo repo})))))
  239. (defn rtc-stop
  240. []
  241. (when-let [canceler (:canceler @*rtc-loop-metadata)]
  242. (canceler)
  243. (reset! *rtc-loop-metadata empty-rtc-loop-metadata)))
  244. (defn rtc-toggle-auto-push
  245. []
  246. (when-let [*auto-push? (:*rtc-auto-push? @*rtc-loop-metadata)]
  247. (swap! *auto-push? not)))
  248. (defn new-task--get-graphs
  249. [token]
  250. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  251. (m/join :graphs
  252. (ws-util/send&recv get-ws-create-task {:action "list-graphs"}))))
  253. (defn new-task--delete-graph
  254. "Return a task that return true if succeed"
  255. [token graph-uuid]
  256. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  257. (m/sp
  258. (let [{:keys [ex-data]}
  259. (m/? (ws-util/send&recv get-ws-create-task
  260. {:action "delete-graph" :graph-uuid graph-uuid}))]
  261. (when ex-data (prn ::delete-graph-failed graph-uuid ex-data))
  262. (boolean (nil? ex-data))))))
  263. (defn new-task--get-user-info
  264. "Return a task that return users-info about the graph."
  265. [token graph-uuid]
  266. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  267. (m/join :users
  268. (ws-util/send&recv get-ws-create-task
  269. {:action "get-users-info" :graph-uuid graph-uuid}))))
  270. (defn new-task--grant-access-to-others
  271. [token graph-uuid & {:keys [target-user-uuids target-user-emails]}]
  272. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  273. (ws-util/send&recv get-ws-create-task
  274. (cond-> {:action "grant-access"
  275. :graph-uuid graph-uuid}
  276. target-user-uuids (assoc :target-user-uuids target-user-uuids)
  277. target-user-emails (assoc :target-user-emails target-user-emails)))))
  278. (defn new-task--get-block-content-versions
  279. "Return a task that return map [:ex-data :ex-message :versions]"
  280. [token graph-uuid block-uuid]
  281. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  282. (m/join :versions (ws-util/send&recv get-ws-create-task
  283. {:action "query-block-content-versions"
  284. :block-uuids [block-uuid]
  285. :graph-uuid graph-uuid}))))
  286. (def ^:private create-get-state-flow*
  287. (let [rtc-loop-metadata-flow (m/watch *rtc-loop-metadata)]
  288. (m/ap
  289. (let [{rtc-lock :*rtc-lock :keys [repo graph-uuid user-uuid rtc-state-flow *rtc-auto-push? *online-users]}
  290. (m/?< rtc-loop-metadata-flow)]
  291. (try
  292. (when (and repo rtc-state-flow *rtc-auto-push? rtc-lock)
  293. (m/?<
  294. (m/latest
  295. (fn [rtc-state rtc-auto-push? rtc-lock online-users pending-local-ops-count local-tx remote-tx]
  296. {:graph-uuid graph-uuid
  297. :user-uuid user-uuid
  298. :unpushed-block-update-count pending-local-ops-count
  299. :local-tx local-tx
  300. :remote-tx remote-tx
  301. :rtc-state rtc-state
  302. :rtc-lock rtc-lock
  303. :auto-push? rtc-auto-push?
  304. :online-users online-users})
  305. rtc-state-flow (m/watch *rtc-auto-push?) (m/watch rtc-lock) (m/watch *online-users)
  306. (client-op/create-pending-block-ops-count-flow repo)
  307. (rtc-log-and-state/create-local-t-flow graph-uuid)
  308. (rtc-log-and-state/create-remote-t-flow graph-uuid))))
  309. (catch Cancelled _))))))
  310. (def ^:private create-get-state-flow (c.m/throttle 300 create-get-state-flow*))
  311. (defn new-task--get-debug-state
  312. []
  313. (m/reduce {} nil (m/eduction (take 1) create-get-state-flow)))
  314. (defn new-task--snapshot-graph
  315. [token graph-uuid]
  316. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  317. (m/join #(select-keys % [:snapshot-uuid :graph-uuid])
  318. (ws-util/send&recv get-ws-create-task {:action "snapshot-graph"
  319. :graph-uuid graph-uuid}))))
  320. (defn new-task--snapshot-list
  321. [token graph-uuid]
  322. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  323. (m/join :snapshot-list
  324. (ws-util/send&recv get-ws-create-task {:action "snapshot-list"
  325. :graph-uuid graph-uuid}))))
  326. (defn new-task--upload-graph
  327. [token repo remote-graph-name]
  328. (m/sp
  329. (if-let [conn (worker-state/get-datascript-conn repo)]
  330. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  331. (m/? (r.upload-download/new-task--upload-graph get-ws-create-task repo conn remote-graph-name)))
  332. (r.ex/->map (ex-info "Not found db-conn" {:type :rtc.exception/not-found-db-conn
  333. :repo repo})))))
  334. (defn new-task--request-download-graph
  335. [token graph-uuid]
  336. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  337. (r.upload-download/new-task--request-download-graph get-ws-create-task graph-uuid)))
  338. (defn new-task--download-info-list
  339. [token graph-uuid]
  340. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  341. (r.upload-download/new-task--download-info-list get-ws-create-task graph-uuid)))
  342. (defn new-task--wait-download-info-ready
  343. [token download-info-uuid graph-uuid timeout-ms]
  344. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  345. (r.upload-download/new-task--wait-download-info-ready
  346. get-ws-create-task download-info-uuid graph-uuid timeout-ms)))
  347. (def new-task--download-graph-from-s3 r.upload-download/new-task--download-graph-from-s3)
  348. ;;; ================ API (ends) ================
  349. ;;; subscribe state ;;;
  350. (defonce ^:private *last-subscribe-canceler (atom nil))
  351. (defn- subscribe-state
  352. []
  353. (when-let [canceler @*last-subscribe-canceler]
  354. (canceler)
  355. (reset! *last-subscribe-canceler nil))
  356. (let [cancel (c.m/run-task
  357. (m/reduce
  358. (fn [_ v] (worker-util/post-message :rtc-sync-state v))
  359. create-get-state-flow)
  360. :subscribe-state)]
  361. (reset! *last-subscribe-canceler cancel)
  362. nil))
  363. (subscribe-state)
  364. (comment
  365. (do
  366. (def user-uuid "7f41990d-2c8f-4f79-b231-88e9f652e072")
  367. (def graph-uuid "ff7186c1-5903-4bc8-b4e9-ca23525b9983")
  368. (def repo "logseq_db_4-23")
  369. (def conn (worker-state/get-datascript-conn repo))
  370. (def date-formatter "MMM do, yyyy")
  371. (def debug-ws-url "wss://ws-dev.logseq.com/rtc-sync?token=???")
  372. (let [{:keys [rtc-state-flow *rtc-auto-push? rtc-loop-task]}
  373. (create-rtc-loop user-uuid graph-uuid repo conn date-formatter nil {:debug-ws-url debug-ws-url})
  374. c (c.m/run-task rtc-loop-task :rtc-loop-task)]
  375. (def cancel c)
  376. (def rtc-state-flow rtc-state-flow)
  377. (def *rtc-auto-push? *rtc-auto-push?)))
  378. (cancel)
  379. (do
  380. (def a (atom 1))
  381. (def f1 (m/watch a))
  382. (def f2 (create-pull-remote-updates-flow 5000 f1))
  383. (def cancel (c.m/run-task (m/reduce (fn [_ v] (prn :v v)) f2) :xxx)))
  384. (defn sleep-emit [delays]
  385. (m/ap (let [n (m/?> (m/seed delays))
  386. r (m/? (m/sleep n n))]
  387. (prn :xxx r (t/now))
  388. r)))
  389. (def cancel
  390. ((->> (m/sample vector
  391. (m/latest identity (m/reductions {} 0 (sleep-emit [1000 1 2])))
  392. (sleep-emit [2000 3000 1000]))
  393. (m/reduce (fn [_ v] (prn :v v)))) prn prn)))