core.cljs 30 KB


  1. (ns frontend.worker.rtc.core
  2. "Main(use missionary) ns for rtc related fns"
  3. (:require [clojure.data :as data]
  4. [datascript.core :as d]
  5. [frontend.common.missionary :as c.m]
  6. [frontend.common.thread-api :refer [def-thread-api]]
  7. [frontend.worker-common.util :as worker-util]
  8. [frontend.worker.device :as worker-device]
  9. [frontend.worker.rtc.asset :as r.asset]
  10. [frontend.worker.rtc.branch-graph :as r.branch-graph]
  11. [frontend.worker.rtc.client :as r.client]
  12. [frontend.worker.rtc.client-op :as client-op]
  13. [frontend.worker.rtc.db :as rtc-db]
  14. [frontend.worker.rtc.exception :as r.ex]
  15. [frontend.worker.rtc.full-upload-download-graph :as r.upload-download]
  16. [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
  17. [frontend.worker.rtc.remote-update :as r.remote-update]
  18. [frontend.worker.rtc.skeleton]
  19. [frontend.worker.rtc.ws :as ws]
  20. [frontend.worker.rtc.ws-util :as ws-util :refer [gen-get-ws-create-map--memoized]]
  21. [frontend.worker.shared-service :as shared-service]
  22. [frontend.worker.state :as worker-state]
  23. [lambdaisland.glogi :as log]
  24. [logseq.common.config :as common-config]
  25. [logseq.db :as ldb]
  26. [logseq.db.frontend.schema :as db-schema]
  27. [logseq.db.sqlite.util :as sqlite-util]
  28. [malli.core :as ma]
  29. [missionary.core :as m])
  30. (:import [missionary Cancelled]))
  31. (def ^:private rtc-state-schema
  32. [:map
  33. [:ws-state {:optional true} [:enum :connecting :open :closing :closed]]])
  34. (def ^:private rtc-state-validator (ma/validator rtc-state-schema))
  35. (def ^:private sentinel (js-obj))
  36. (defn- get-remote-updates
  37. "Return a flow: receive messages from ws,
  38. and filter messages with :req-id=
  39. - `push-updates`
  40. - `online-users-updated`.
  41. - `push-asset-block-updates`"
  42. [get-ws-create-task]
  43. (m/ap
  44. (loop []
  45. (let [ws (m/? get-ws-create-task)
  46. x (try
  47. (m/?> (m/eduction
  48. (filter (fn [data]
  49. (contains?
  50. #{"online-users-updated"
  51. "push-updates"
  52. "push-asset-block-updates"}
  53. (:req-id data))))
  54. (ws/recv-flow ws)))
  55. (catch js/CloseEvent _
  56. sentinel))]
  57. (if (identical? x sentinel)
  58. (recur)
  59. x)))))
  60. (defn- create-local-updates-check-flow
  61. "Return a flow: emit if need to push local-updates"
  62. [repo *auto-push? interval-ms]
  63. (let [auto-push-flow (m/watch *auto-push?)
  64. clock-flow (c.m/clock interval-ms :clock)
  65. merge-flow (m/latest vector auto-push-flow clock-flow)]
  66. (m/eduction (filter first)
  67. (map second)
  68. (filter (fn [v] (when (pos? (client-op/get-unpushed-ops-count repo)) v)))
  69. merge-flow)))
  70. (defn- create-pull-remote-updates-flow
  71. "Return a flow: emit to pull remote-updates.
  72. reschedule next emit(INTERVAL-MS later) every time RESCHEDULE-FLOW emit a value.
  73. TODO: add immediate-emit-flow arg,
  74. e.g. when mobile-app becomes active, trigger one pull-remote-updates"
  75. [interval-ms reschedule-flow & [_immediate-emit-flow]]
  76. (let [v {:type :pull-remote-updates}
  77. clock-flow (m/ap
  78. (loop []
  79. (m/amb
  80. (m/? (m/sleep interval-ms v))
  81. (recur))))]
  82. (m/ap
  83. (m/amb
  84. v
  85. (let [_ (m/?< (c.m/continue-flow reschedule-flow))]
  86. (try
  87. (m/?< clock-flow)
  88. (catch Cancelled _ (m/amb))))))))
  89. (defn create-inject-users-info-flow
  90. "Return a flow: emit event if need to notify the server to inject users-info to graph."
  91. [repo online-users-updated-flow]
  92. (m/ap
  93. (if-let [conn (worker-state/get-datascript-conn repo)]
  94. (if-let [online-users (seq (m/?> online-users-updated-flow))]
  95. (let [user-uuid->user (into {} (map (juxt :user/uuid identity) online-users))
  96. user-blocks (keep (fn [user-uuid] (d/entity @conn [:block/uuid user-uuid])) (keys user-uuid->user))]
  97. (if (or (not= (count user-blocks) (count user-uuid->user))
  98. (some
  99. ;; check if some attrs not equal among user-blocks and online-users
  100. (fn [user-block]
  101. (let [user (user-uuid->user (:block/uuid user-block))
  102. [diff-r1 diff-r2]
  103. (data/diff
  104. (select-keys user-block [:logseq.property.user/name :logseq.property.user/email :logseq.property.user/avatar])
  105. (update-keys
  106. (select-keys user [:user/name :user/email :user/avatar])
  107. (fn [k] (keyword "logseq.property.user" (name k)))))]
  108. (or (some? diff-r1) (some? diff-r2))))
  109. user-blocks))
  110. (m/amb {:type :inject-users-info}
  111. ;; then trigger a pull-remote-updates to update local-graph
  112. {:type :pull-remote-updates :from :x})
  113. (m/amb)))
  114. (m/amb))
  115. (m/amb))))
  116. (defn- create-mixed-flow
  117. "Return a flow that emits all kinds of events:
  118. `:remote-update`: remote-updates data from server
  119. `:remote-asset-block-update`: remote asset-updates from server
  120. `:local-update-check`: event to notify to check if there're some new local-updates, then push to remote.
  121. `:online-users-updated`: online users info updated
  122. `:pull-remote-updates`: pull remote updates
  123. `:inject-users-info`: notify server to inject users-info into the graph"
  124. [repo get-ws-create-task *auto-push? *online-users]
  125. (let [remote-updates-flow (m/eduction
  126. (map (fn [data]
  127. (case (:req-id data)
  128. "push-updates" {:type :remote-update :value data}
  129. "online-users-updated" {:type :online-users-updated :value data}
  130. "push-asset-block-updates" {:type :remote-asset-block-update :value data})))
  131. (get-remote-updates get-ws-create-task))
  132. local-updates-check-flow (m/eduction
  133. (map (fn [data] {:type :local-update-check :value data}))
  134. (create-local-updates-check-flow repo *auto-push? 2000))
  135. inject-user-info-flow (create-inject-users-info-flow repo (m/watch *online-users))
  136. mix-flow (c.m/mix remote-updates-flow local-updates-check-flow inject-user-info-flow)]
  137. (c.m/mix mix-flow (create-pull-remote-updates-flow 60000 mix-flow))))
  138. (defn- create-ws-state-flow
  139. [*current-ws]
  140. (m/relieve
  141. (m/ap
  142. (let [ws (m/?< (m/watch *current-ws))]
  143. (try
  144. (if ws
  145. (m/?< (ws/create-mws-state-flow ws))
  146. (m/amb))
  147. (catch Cancelled _
  148. (m/amb)))))))
  149. (defn- create-rtc-state-flow
  150. [ws-state-flow]
  151. (m/latest
  152. (fn [ws-state]
  153. {:post [(rtc-state-validator %)]}
  154. (cond-> {}
  155. ws-state (assoc :ws-state ws-state)))
  156. (m/reductions {} nil ws-state-flow)))
  157. (defn- update-remote-schema-version!
  158. [conn server-schema-version]
  159. (when server-schema-version
  160. (d/transact! conn [(ldb/kv :logseq.kv/remote-schema-version server-schema-version)]
  161. {:gen-undo-ops? false
  162. :persist-op? false})))
  163. (defonce ^:private *rtc-lock (atom nil))
  164. (defn- holding-rtc-lock
  165. "Use this fn to prevent multiple rtc-loops at same time.
  166. rtc-loop-task is stateless, but conn is not.
  167. we need to ensure that no two concurrent rtc-loop-tasks are modifying `conn` at the same time"
  168. [started-dfv task]
  169. (m/sp
  170. (when-not (compare-and-set! *rtc-lock nil true)
  171. (let [e (ex-info "Must not run multiple rtc-loops, try later"
  172. {:type :rtc.exception/lock-failed
  173. :missionary/retry true})]
  174. (started-dfv e)
  175. (throw e)))
  176. (try
  177. (m/? task)
  178. (finally
  179. (reset! *rtc-lock nil)))))
  180. (def ^:private *graph-uuid->*online-users (atom {}))
  181. (defn- get-or-create-*online-users
  182. [graph-uuid]
  183. (assert (uuid? graph-uuid) graph-uuid)
  184. (if-let [*online-users (get @*graph-uuid->*online-users graph-uuid)]
  185. *online-users
  186. (let [*online-users (atom nil)]
  187. (swap! *graph-uuid->*online-users assoc graph-uuid *online-users)
  188. *online-users)))
  189. (declare new-task--inject-users-info)
  190. (defn- create-rtc-loop
  191. "Return a map with [:rtc-state-flow :rtc-loop-task :*rtc-auto-push? :onstarted-task]
  192. TODO: auto refresh token if needed"
  193. [graph-uuid schema-version repo conn date-formatter token
  194. & {:keys [auto-push? debug-ws-url] :or {auto-push? true}}]
  195. (let [major-schema-version (db-schema/major-version schema-version)
  196. ws-url (or debug-ws-url (ws-util/get-ws-url token))
  197. *auto-push? (atom auto-push?)
  198. *remote-profile? (atom false)
  199. *last-calibrate-t (atom nil)
  200. *online-users (get-or-create-*online-users graph-uuid)
  201. *assets-sync-loop-canceler (atom nil)
  202. *server-schema-version (atom nil)
  203. started-dfv (m/dfv)
  204. add-log-fn (fn [type message]
  205. (assert (map? message) message)
  206. (rtc-log-and-state/rtc-log type (assoc message :graph-uuid graph-uuid)))
  207. {:keys [*current-ws get-ws-create-task]}
  208. (gen-get-ws-create-map--memoized ws-url)
  209. get-ws-create-task (r.client/ensure-register-graph-updates--memoized
  210. get-ws-create-task graph-uuid major-schema-version
  211. repo conn *last-calibrate-t *online-users *server-schema-version add-log-fn)
  212. {:keys [assets-sync-loop-task]}
  213. (r.asset/create-assets-sync-loop repo get-ws-create-task graph-uuid major-schema-version conn *auto-push?)
  214. mixed-flow (create-mixed-flow repo get-ws-create-task *auto-push? *online-users)]
  215. (assert (some? *current-ws))
  216. {:rtc-state-flow (create-rtc-state-flow (create-ws-state-flow *current-ws))
  217. :*rtc-auto-push? *auto-push?
  218. :*rtc-remote-profile? *remote-profile?
  219. :*online-users *online-users
  220. :onstarted-task started-dfv
  221. :rtc-loop-task
  222. (holding-rtc-lock
  223. started-dfv
  224. (m/sp
  225. (try
  226. (log/info :rtc :loop-starting)
  227. ;; init run to open a ws
  228. (m/? get-ws-create-task)
  229. (started-dfv true)
  230. (update-remote-schema-version! conn @*server-schema-version)
  231. (reset! *assets-sync-loop-canceler
  232. (c.m/run-task :assets-sync-loop-task
  233. assets-sync-loop-task))
  234. (->>
  235. (let [event (m/?> mixed-flow)]
  236. (case (:type event)
  237. (:remote-update :remote-asset-block-update)
  238. (try (r.remote-update/apply-remote-update graph-uuid repo conn date-formatter event add-log-fn)
  239. (catch :default e
  240. (when (= ::r.remote-update/need-pull-remote-data (:type (ex-data e)))
  241. (m/? (r.client/new-task--pull-remote-data
  242. repo conn graph-uuid major-schema-version date-formatter get-ws-create-task add-log-fn)))))
  243. :local-update-check
  244. (m/? (r.client/new-task--push-local-ops
  245. repo conn graph-uuid major-schema-version date-formatter
  246. get-ws-create-task *remote-profile? add-log-fn))
  247. :online-users-updated
  248. (reset! *online-users (:online-users (:value event)))
  249. :pull-remote-updates
  250. (m/? (r.client/new-task--pull-remote-data
  251. repo conn graph-uuid major-schema-version date-formatter get-ws-create-task add-log-fn))
  252. :inject-users-info
  253. (m/? (new-task--inject-users-info token graph-uuid major-schema-version))))
  254. (m/ap)
  255. (m/reduce {} nil)
  256. (m/?))
  257. (catch Cancelled e
  258. (add-log-fn :rtc.log/cancelled {})
  259. (throw e))
  260. (catch :default e
  261. (add-log-fn :rtc.log/cancelled {:ex-message (ex-message e) :ex-data (ex-data e)})
  262. (throw e))
  263. (finally
  264. (started-dfv :final) ;; ensure started-dfv can recv a value(values except the first one will be disregarded)
  265. (when @*assets-sync-loop-canceler (@*assets-sync-loop-canceler))))))}))
  266. (def ^:private empty-rtc-loop-metadata
  267. {:repo nil
  268. :graph-uuid nil
  269. :local-graph-schema-version nil
  270. :remote-graph-schema-version nil
  271. :user-uuid nil
  272. :rtc-state-flow nil
  273. :*rtc-auto-push? nil
  274. :*rtc-remote-profile? nil
  275. :*online-users nil
  276. :*rtc-lock nil
  277. :canceler nil
  278. :*last-stop-exception nil})
  279. (def ^:private rtc-loop-metadata-keys (set (keys empty-rtc-loop-metadata)))
  280. (defonce ^:private *rtc-loop-metadata (atom empty-rtc-loop-metadata
  281. :validator
  282. (fn [v] (= rtc-loop-metadata-keys (set (keys v))))))
  283. (defn- validate-rtc-start-conditions
  284. "Return exception if validation failed"
  285. [repo token]
  286. (if-let [conn (worker-state/get-datascript-conn repo)]
  287. (let [user-uuid (:sub (worker-util/parse-jwt token))
  288. graph-uuid (ldb/get-graph-rtc-uuid @conn)
  289. schema-version (ldb/get-graph-schema-version @conn)
  290. remote-schema-version (ldb/get-graph-remote-schema-version @conn)
  291. app-schema-version db-schema/version]
  292. (cond
  293. (not user-uuid)
  294. (ex-info "Invalid token" {:type :rtc.exception/invalid-token})
  295. (not graph-uuid)
  296. r.ex/ex-local-not-rtc-graph
  297. (not schema-version)
  298. (ex-info "Not found schema-version" {:type :rtc.exception/not-found-schema-version})
  299. (not remote-schema-version)
  300. (ex-info "Not found remote-schema-version" {:type :rtc.exception/not-found-remote-schema-version})
  301. (apply not= (map db-schema/major-version [app-schema-version remote-schema-version schema-version]))
  302. (ex-info "major schema version mismatch" {:type :rtc.exception/major-schema-version-mismatched
  303. :sub-type
  304. (r.branch-graph/compare-schemas
  305. remote-schema-version app-schema-version schema-version)
  306. :app app-schema-version
  307. :local schema-version
  308. :remote remote-schema-version})
  309. :else
  310. {:conn conn
  311. :user-uuid user-uuid
  312. :graph-uuid graph-uuid
  313. :schema-version schema-version
  314. :remote-schema-version remote-schema-version
  315. :date-formatter (common-config/get-date-formatter (worker-state/get-config repo))}))
  316. (ex-info "Not found db-conn" {:type :rtc.exception/not-found-db-conn
  317. :repo repo})))
  318. ;;; ================ API ================
  319. (defn- new-task--rtc-start*
  320. [repo token]
  321. (m/sp
  322. ;; ensure device metadata existing first
  323. (m/? (worker-device/new-task--ensure-device-metadata! token))
  324. (let [{:keys [conn user-uuid graph-uuid schema-version remote-schema-version date-formatter] :as r}
  325. (validate-rtc-start-conditions repo token)]
  326. (if (instance? ExceptionInfo r)
  327. r
  328. (let [{:keys [rtc-state-flow *rtc-auto-push? *rtc-remote-profile? rtc-loop-task *online-users onstarted-task]}
  329. (create-rtc-loop graph-uuid schema-version repo conn date-formatter token)
  330. *last-stop-exception (atom nil)
  331. canceler (c.m/run-task :rtc-loop-task
  332. rtc-loop-task
  333. :fail (fn [e]
  334. (reset! *last-stop-exception e)
  335. (log/info :rtc-loop-task e)
  336. (when (= :rtc.exception/ws-timeout (some-> e ex-data :type))
  337. ;; if fail reason is websocket-timeout, try to restart rtc
  338. (worker-state/<invoke-main-thread :thread-api/rtc-start-request repo))))
  339. start-ex (m/? onstarted-task)]
  340. (if (instance? ExceptionInfo start-ex)
  341. (do
  342. (canceler)
  343. start-ex)
  344. (do (reset! *rtc-loop-metadata {:repo repo
  345. :graph-uuid graph-uuid
  346. :local-graph-schema-version schema-version
  347. :remote-graph-schema-version remote-schema-version
  348. :user-uuid user-uuid
  349. :rtc-state-flow rtc-state-flow
  350. :*rtc-auto-push? *rtc-auto-push?
  351. :*rtc-remote-profile? *rtc-remote-profile?
  352. :*online-users *online-users
  353. :*rtc-lock *rtc-lock
  354. :canceler canceler
  355. :*last-stop-exception *last-stop-exception})
  356. nil)))))))
  357. (declare rtc-stop)
  358. (defn new-task--rtc-start
  359. [stop-before-start?]
  360. (m/sp
  361. (let [repo (worker-state/get-current-repo)
  362. token (worker-state/get-id-token)
  363. conn (worker-state/get-datascript-conn repo)]
  364. (when (and repo
  365. (sqlite-util/db-based-graph? repo)
  366. token conn)
  367. (when stop-before-start? (rtc-stop))
  368. (let [ex (m/? (new-task--rtc-start* repo token))]
  369. (when-let [ex-data* (ex-data ex)]
  370. (case (:type ex-data*)
  371. (:rtc.exception/not-rtc-graph
  372. :rtc.exception/major-schema-version-mismatched
  373. :rtc.exception/lock-failed)
  374. (log/info :rtc-start-failed ex)
  375. :rtc.exception/not-found-db-conn
  376. (log/error :rtc-start-failed ex)
  377. (log/error :BUG-unknown-error ex))
  378. (r.ex/->map ex)))))))
  379. (defn rtc-stop
  380. []
  381. (when-let [canceler (:canceler @*rtc-loop-metadata)]
  382. (canceler)
  383. (reset! *rtc-loop-metadata empty-rtc-loop-metadata)))
  384. (defn rtc-toggle-auto-push
  385. []
  386. (when-let [*auto-push? (:*rtc-auto-push? @*rtc-loop-metadata)]
  387. (swap! *auto-push? not)))
  388. (defn rtc-toggle-remote-profile
  389. []
  390. (when-let [*rtc-remote-profile? (:*rtc-remote-profile? @*rtc-loop-metadata)]
  391. (swap! *rtc-remote-profile? not)))
  392. (defn new-task--get-graphs
  393. [token]
  394. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  395. (m/join :graphs
  396. (ws-util/send&recv get-ws-create-task {:action "list-graphs"}))))
  397. (defn new-task--delete-graph
  398. "Return a task that return true if succeed"
  399. [token graph-uuid schema-version]
  400. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  401. (m/sp
  402. (let [{:keys [ex-data]}
  403. (m/? (ws-util/send&recv get-ws-create-task
  404. {:action "delete-graph"
  405. :graph-uuid graph-uuid
  406. :schema-version (str schema-version)}))]
  407. (if ex-data
  408. (log/info ::delete-graph-failed {:graph-uuid graph-uuid :ex-data ex-data})
  409. ;; Clean up rtc data in existing dbs so that the graph can be uploaded again
  410. (when-let [repo (worker-state/get-current-repo)]
  411. (when-let [conn (worker-state/get-datascript-conn repo)]
  412. (let [graph-id (ldb/get-graph-rtc-uuid @conn)]
  413. (when (= (str graph-id) (str graph-uuid))
  414. (rtc-db/remove-rtc-data-in-conn! repo))))))
  415. (boolean (nil? ex-data))))))
  416. (defn new-task--get-users-info
  417. "Return a task that return users-info about the graph."
  418. [token graph-uuid]
  419. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  420. (m/join :users
  421. (ws-util/send&recv get-ws-create-task
  422. {:action "get-users-info" :graph-uuid graph-uuid}))))
  423. (defn new-task--inject-users-info
  424. [token graph-uuid major-schema-version]
  425. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  426. (ws-util/send&recv get-ws-create-task
  427. {:action "inject-users-info"
  428. :graph-uuid graph-uuid
  429. :schema-version (str major-schema-version)})))
  430. (defn new-task--grant-access-to-others
  431. [token graph-uuid & {:keys [target-user-uuids target-user-emails]}]
  432. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  433. (ws-util/send&recv get-ws-create-task
  434. (cond-> {:action "grant-access"
  435. :graph-uuid graph-uuid}
  436. target-user-uuids (assoc :target-user-uuids target-user-uuids)
  437. target-user-emails (assoc :target-user-emails target-user-emails)))))
  438. (defn new-task--get-block-content-versions
  439. "Return a task that return map [:ex-data :ex-message :versions]"
  440. [token graph-uuid block-uuid]
  441. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  442. (m/join :versions (ws-util/send&recv get-ws-create-task
  443. {:action "query-block-content-versions"
  444. :block-uuids [block-uuid]
  445. :graph-uuid graph-uuid}))))
  446. (def ^:private create-get-state-flow*
  447. (let [rtc-loop-metadata-flow (m/watch *rtc-loop-metadata)]
  448. (m/ap
  449. (let [{*rtc-lock' :*rtc-lock
  450. :keys [repo graph-uuid local-graph-schema-version remote-graph-schema-version
  451. user-uuid rtc-state-flow *rtc-auto-push? *rtc-remote-profile?
  452. *online-users *last-stop-exception]}
  453. (m/?< rtc-loop-metadata-flow)]
  454. (try
  455. (when (and repo rtc-state-flow *rtc-auto-push? *rtc-lock')
  456. (m/?<
  457. (m/latest
  458. (fn [rtc-state rtc-auto-push? rtc-remote-profile?
  459. rtc-lock online-users pending-local-ops-count pending-asset-ops-count [local-tx remote-tx]]
  460. {:graph-uuid graph-uuid
  461. :local-graph-schema-version (db-schema/schema-version->string local-graph-schema-version)
  462. :remote-graph-schema-version (db-schema/schema-version->string remote-graph-schema-version)
  463. :user-uuid user-uuid
  464. :unpushed-block-update-count pending-local-ops-count
  465. :pending-asset-ops-count pending-asset-ops-count
  466. :local-tx local-tx
  467. :remote-tx remote-tx
  468. :rtc-state rtc-state
  469. :rtc-lock rtc-lock
  470. :auto-push? rtc-auto-push?
  471. :remote-profile? rtc-remote-profile?
  472. :online-users online-users
  473. :last-stop-exception-ex-data (some-> *last-stop-exception deref ex-data)})
  474. rtc-state-flow
  475. (m/watch *rtc-auto-push?) (m/watch *rtc-remote-profile?)
  476. (m/watch *rtc-lock') (m/watch *online-users)
  477. (client-op/create-pending-block-ops-count-flow repo)
  478. (client-op/create-pending-asset-ops-count-flow repo)
  479. (rtc-log-and-state/create-local&remote-t-flow graph-uuid))))
  480. (catch Cancelled _))))))
  481. (def ^:private create-get-state-flow (c.m/throttle 300 create-get-state-flow*))
  482. (defn new-task--get-debug-state
  483. []
  484. (c.m/snapshot-of-flow create-get-state-flow))
  485. (defn new-task--upload-graph
  486. [token repo remote-graph-name]
  487. (let [{:keys [conn schema-version] :as r}
  488. (if-let [conn (worker-state/get-datascript-conn repo)]
  489. (if-let [schema-version (ldb/get-graph-schema-version @conn)]
  490. {:conn conn :schema-version schema-version}
  491. (ex-info "Not found schema-version" {:type :rtc.exception/not-found-schema-version}))
  492. (ex-info "Not found db-conn" {:type :rtc.exception/not-found-db-conn :repo repo}))]
  493. (m/sp
  494. (if (instance? ExceptionInfo r)
  495. (r.ex/->map r)
  496. (let [major-schema-version (db-schema/major-version schema-version)
  497. {:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  498. (m/? (r.upload-download/new-task--upload-graph
  499. get-ws-create-task repo conn remote-graph-name major-schema-version)))))))
  500. (defn new-task--branch-graph
  501. [token repo]
  502. (let [{:keys [conn graph-uuid schema-version] :as r}
  503. (if-let [conn (worker-state/get-datascript-conn repo)]
  504. (if-let [graph-uuid (ldb/get-graph-rtc-uuid @conn)]
  505. (if-let [schema-version (ldb/get-graph-schema-version @conn)]
  506. {:conn conn :graph-uuid graph-uuid :schema-version schema-version}
  507. (ex-info "Not found schema-version" {:type :rtc.exception/not-found-schema-version}))
  508. r.ex/ex-local-not-rtc-graph)
  509. (ex-info "Not found db-conn" {:type :rtc.exception/not-found-db-conn :repo repo}))]
  510. (m/sp
  511. (if (instance? ExceptionInfo r)
  512. (r.ex/->map r)
  513. (let [major-schema-version (db-schema/major-version schema-version)
  514. {:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  515. (m/? (r.upload-download/new-task--branch-graph
  516. get-ws-create-task repo conn graph-uuid major-schema-version)))))))
  517. (defn new-task--request-download-graph
  518. [token graph-uuid schema-version]
  519. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  520. (r.upload-download/new-task--request-download-graph get-ws-create-task graph-uuid schema-version)))
  521. (comment
  522. (defn new-task--download-info-list
  523. [token graph-uuid schema-version]
  524. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  525. (r.upload-download/new-task--download-info-list get-ws-create-task graph-uuid schema-version))))
  526. (defn new-task--wait-download-info-ready
  527. [token download-info-uuid graph-uuid schema-version timeout-ms]
  528. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  529. (r.upload-download/new-task--wait-download-info-ready
  530. get-ws-create-task download-info-uuid graph-uuid schema-version timeout-ms)))
  531. (def new-task--download-graph-from-s3 r.upload-download/new-task--download-graph-from-s3)
  532. (def-thread-api :thread-api/rtc-start
  533. [stop-before-start?]
  534. (new-task--rtc-start stop-before-start?))
  535. (def-thread-api :thread-api/rtc-stop
  536. []
  537. (rtc-stop))
  538. (def-thread-api :thread-api/rtc-toggle-auto-push
  539. []
  540. (rtc-toggle-auto-push))
  541. (def-thread-api :thread-api/rtc-toggle-remote-profile
  542. []
  543. (rtc-toggle-remote-profile))
  544. (def-thread-api :thread-api/rtc-grant-graph-access
  545. [token graph-uuid target-user-uuids target-user-emails]
  546. (new-task--grant-access-to-others token graph-uuid
  547. :target-user-uuids target-user-uuids
  548. :target-user-emails target-user-emails))
  549. (def-thread-api :thread-api/rtc-get-graphs
  550. [token]
  551. (new-task--get-graphs token))
  552. (def-thread-api :thread-api/rtc-delete-graph
  553. [token graph-uuid schema-version]
  554. (new-task--delete-graph token graph-uuid schema-version))
  555. (def-thread-api :thread-api/rtc-get-users-info
  556. [token graph-uuid]
  557. (new-task--get-users-info token graph-uuid))
  558. (def-thread-api :thread-api/rtc-get-block-content-versions
  559. [token graph-uuid block-uuid]
  560. (new-task--get-block-content-versions token graph-uuid block-uuid))
  561. (def-thread-api :thread-api/rtc-get-debug-state
  562. []
  563. (new-task--get-debug-state))
  564. (def-thread-api :thread-api/rtc-async-upload-graph
  565. [repo token remote-graph-name]
  566. (new-task--upload-graph token repo remote-graph-name))
  567. (def-thread-api :thread-api/rtc-async-branch-graph
  568. [repo token]
  569. (new-task--branch-graph token repo))
  570. (def-thread-api :thread-api/rtc-request-download-graph
  571. [token graph-uuid schema-version]
  572. (new-task--request-download-graph token graph-uuid schema-version))
  573. (def-thread-api :thread-api/rtc-wait-download-graph-info-ready
  574. [token download-info-uuid graph-uuid schema-version timeout-ms]
  575. (new-task--wait-download-info-ready token download-info-uuid graph-uuid schema-version timeout-ms))
  576. (def-thread-api :thread-api/rtc-download-graph-from-s3
  577. [graph-uuid graph-name s3-url]
  578. (new-task--download-graph-from-s3 graph-uuid graph-name s3-url))
  579. (comment
  580. (def-thread-api :thread-api/rtc-download-info-list
  581. [token graph-uuid schema-version]
  582. (new-task--download-info-list token graph-uuid schema-version)))
  583. ;;; ================ API (ends) ================
  584. ;;; subscribe state ;;;
  585. (when-not common-config/PUBLISHING
  586. (c.m/run-background-task
  587. ::subscribe-state
  588. (m/reduce
  589. (fn [_ v]
  590. (shared-service/broadcast-to-clients! :rtc-sync-state v))
  591. create-get-state-flow)))
  592. (comment
  593. (do
  594. (def user-uuid "7f41990d-2c8f-4f79-b231-88e9f652e072")
  595. (def graph-uuid "ff7186c1-5903-4bc8-b4e9-ca23525b9983")
  596. (def repo "logseq_db_4-23")
  597. (def conn (worker-state/get-datascript-conn repo))
  598. (def date-formatter "MMM do, yyyy")
  599. (def debug-ws-url "wss://ws-dev.logseq.com/rtc-sync?token=???")
  600. (let [{:keys [rtc-state-flow *rtc-auto-push? rtc-loop-task]}
  601. (create-rtc-loop user-uuid graph-uuid repo conn date-formatter nil {:debug-ws-url debug-ws-url})
  602. c (c.m/run-task rtc-loop-task :rtc-loop-task)]
  603. (def cancel c)
  604. (def rtc-state-flow rtc-state-flow)
  605. (def *rtc-auto-push? *rtc-auto-push?)))
  606. (cancel)
  607. (do
  608. (def a (atom 1))
  609. (def f1 (m/watch a))
  610. (def f2 (create-pull-remote-updates-flow 5000 f1))
  611. (def cancel (c.m/run-task (m/reduce (fn [_ v] (prn :v v)) f2) :xxx)))
  612. (defn sleep-emit [delays]
  613. (m/ap (let [n (m/?> (m/seed delays))
  614. r (m/? (m/sleep n n))]
  615. (prn :xxx r (t/now))
  616. r)))
  617. (def cancel
  618. ((->> (m/sample vector
  619. (m/latest identity (m/reductions {} 0 (sleep-emit [1000 1 2])))
  620. (sleep-emit [2000 3000 1000]))
  621. (m/reduce (fn [_ v] (prn :v v)))) prn prn))
  622. (let [f (m/stream (m/ap (m/amb 1 2 3 4)))]
  623. ((m/reduce (fn [r v] (conj r v)) (m/reductions {} :xxx f)) prn prn)
  624. ((m/reduce (fn [r v] (conj r v)) f) prn prn)))