core.cljs 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648
  1. (ns frontend.worker.rtc.core
  2. "Main(use missionary) ns for rtc related fns"
  3. (:require [clojure.data :as data]
  4. [datascript.core :as d]
  5. [frontend.common.missionary :as c.m]
  6. [frontend.common.thread-api :refer [def-thread-api]]
  7. [frontend.worker.device :as worker-device]
  8. [frontend.worker.rtc.asset :as r.asset]
  9. [frontend.worker.rtc.branch-graph :as r.branch-graph]
  10. [frontend.worker.rtc.client :as r.client]
  11. [frontend.worker.rtc.client-op :as client-op]
  12. [frontend.worker.rtc.exception :as r.ex]
  13. [frontend.worker.rtc.full-upload-download-graph :as r.upload-download]
  14. [frontend.worker.rtc.log-and-state :as rtc-log-and-state]
  15. [frontend.worker.rtc.migrate :as r.migrate]
  16. [frontend.worker.rtc.remote-update :as r.remote-update]
  17. [frontend.worker.rtc.skeleton]
  18. [frontend.worker.rtc.ws :as ws]
  19. [frontend.worker.rtc.ws-util :as ws-util :refer [gen-get-ws-create-map--memoized]]
  20. [frontend.worker.state :as worker-state]
  21. [frontend.worker.util :as worker-util]
  22. [lambdaisland.glogi :as log]
  23. [logseq.common.config :as common-config]
  24. [logseq.db :as ldb]
  25. [logseq.db.frontend.schema :as db-schema]
  26. [malli.core :as ma]
  27. [missionary.core :as m])
  28. (:import [missionary Cancelled]))
  29. (def ^:private rtc-state-schema
  30. [:map
  31. [:ws-state {:optional true} [:enum :connecting :open :closing :closed]]])
  32. (def ^:private rtc-state-validator (ma/validator rtc-state-schema))
  33. (def ^:private sentinel (js-obj))
  34. (defn- get-remote-updates
  35. "Return a flow: receive messages from ws,
  36. and filter messages with :req-id=
  37. - `push-updates`
  38. - `online-users-updated`.
  39. - `push-asset-upload-updates`"
  40. [get-ws-create-task]
  41. (m/ap
  42. (loop []
  43. (let [ws (m/? get-ws-create-task)
  44. x (try
  45. (m/?> (m/eduction
  46. (filter (fn [data]
  47. (contains?
  48. #{"online-users-updated"
  49. "push-updates"
  50. "push-asset-upload-updates"}
  51. (:req-id data))))
  52. (ws/recv-flow ws)))
  53. (catch js/CloseEvent _
  54. sentinel))]
  55. (if (identical? x sentinel)
  56. (recur)
  57. x)))))
  58. (defn- create-local-updates-check-flow
  59. "Return a flow: emit if need to push local-updates"
  60. [repo *auto-push? interval-ms]
  61. (let [auto-push-flow (m/watch *auto-push?)
  62. clock-flow (c.m/clock interval-ms :clock)
  63. merge-flow (m/latest vector auto-push-flow clock-flow)]
  64. (m/eduction (filter first)
  65. (map second)
  66. (filter (fn [v] (when (pos? (client-op/get-unpushed-block-ops-count repo)) v)))
  67. merge-flow)))
  68. (defn- create-pull-remote-updates-flow
  69. "Return a flow: emit to pull remote-updates.
  70. reschedule next emit(INTERVAL-MS later) every time FLOW emit a value."
  71. [interval-ms flow]
  72. (let [v {:type :pull-remote-updates}
  73. clock-flow (m/ap
  74. (loop []
  75. (m/amb
  76. (m/? (m/sleep interval-ms v))
  77. (recur))))]
  78. (m/ap
  79. (m/amb
  80. v
  81. (let [_ (m/?< (c.m/continue-flow flow))]
  82. (try
  83. (m/?< clock-flow)
  84. (catch Cancelled _ (m/amb))))))))
  85. (defn create-inject-users-info-flow
  86. "Return a flow: emit event if need to notify the server to inject users-info to graph."
  87. [repo online-users-updated-flow]
  88. (m/ap
  89. (if-let [conn (worker-state/get-datascript-conn repo)]
  90. (if-let [online-users (seq (m/?> online-users-updated-flow))]
  91. (let [user-uuid->user (into {} (map (juxt :user/uuid identity) online-users))
  92. user-blocks (keep (fn [user-uuid] (d/entity @conn [:block/uuid user-uuid])) (keys user-uuid->user))]
  93. (if (or (not= (count user-blocks) (count user-uuid->user))
  94. (some
  95. ;; check if some attrs not equal among user-blocks and online-users
  96. (fn [user-block]
  97. (let [user (user-uuid->user (:block/uuid user-block))
  98. [diff-r1 diff-r2]
  99. (data/diff
  100. (select-keys user-block [:logseq.property.user/name :logseq.property.user/email :logseq.property.user/avatar])
  101. (update-keys
  102. (select-keys user [:user/name :user/email :user/avatar])
  103. (fn [k] (keyword "logseq.property.user" (name k)))))]
  104. (or (some? diff-r1) (some? diff-r2))))
  105. user-blocks))
  106. (m/amb {:type :inject-users-info}
  107. ;; then trigger a pull-remote-updates to update local-graph
  108. {:type :pull-remote-updates :from :x})
  109. (m/amb)))
  110. (m/amb))
  111. (m/amb))))
  112. (defn- create-mixed-flow
  113. "Return a flow that emits all kinds of events:
  114. `:remote-update`: remote-updates data from server
  115. `:remote-asset-update`: remote asset-updates from server
  116. `:local-update-check`: event to notify to check if there're some new local-updates, then push to remote.
  117. `:online-users-updated`: online users info updated
  118. `:pull-remote-updates`: pull remote updates
  119. `:inject-users-info`: notify server to inject users-info into the graph"
  120. [repo get-ws-create-task *auto-push? *online-users]
  121. (let [remote-updates-flow (m/eduction
  122. (map (fn [data]
  123. (case (:req-id data)
  124. "push-updates" {:type :remote-update :value data}
  125. "online-users-updated" {:type :online-users-updated :value data}
  126. "push-asset-upload-updates" {:type :remote-asset-update :value data})))
  127. (get-remote-updates get-ws-create-task))
  128. local-updates-check-flow (m/eduction
  129. (map (fn [data] {:type :local-update-check :value data}))
  130. (create-local-updates-check-flow repo *auto-push? 2000))
  131. inject-user-info-flow (create-inject-users-info-flow repo (m/watch *online-users))
  132. mix-flow (c.m/mix remote-updates-flow local-updates-check-flow inject-user-info-flow)]
  133. (c.m/mix mix-flow (create-pull-remote-updates-flow 60000 mix-flow))))
  134. (defn- create-ws-state-flow
  135. [*current-ws]
  136. (m/relieve
  137. (m/ap
  138. (let [ws (m/?< (m/watch *current-ws))]
  139. (try
  140. (if ws
  141. (m/?< (ws/create-mws-state-flow ws))
  142. (m/amb))
  143. (catch Cancelled _
  144. (m/amb)))))))
  145. (defn- create-rtc-state-flow
  146. [ws-state-flow]
  147. (m/latest
  148. (fn [ws-state]
  149. {:post [(rtc-state-validator %)]}
  150. (cond-> {}
  151. ws-state (assoc :ws-state ws-state)))
  152. (m/reductions {} nil ws-state-flow)))
  153. (defn- add-migration-client-ops!
  154. [repo db server-schema-version]
  155. (when server-schema-version
  156. (let [client-schema-version (ldb/get-graph-schema-version db)
  157. added-ops (r.migrate/add-migration-client-ops! repo db server-schema-version client-schema-version)]
  158. (when (seq added-ops)
  159. (log/info :add-migration-client-ops
  160. {:repo repo
  161. :server-schema-version server-schema-version
  162. :client-schema-version client-schema-version})))))
  163. (defn- update-remote-schema-version!
  164. [conn server-schema-version]
  165. (when server-schema-version
  166. (d/transact! conn [(ldb/kv :logseq.kv/remote-schema-version server-schema-version)]
  167. {:gen-undo-ops? false
  168. :persist-op? false})))
  169. (defonce ^:private *rtc-lock (atom nil))
  170. (defn- holding-rtc-lock
  171. "Use this fn to prevent multiple rtc-loops at same time.
  172. rtc-loop-task is stateless, but conn is not.
  173. we need to ensure that no two concurrent rtc-loop-tasks are modifying `conn` at the same time"
  174. [started-dfv task]
  175. (m/sp
  176. (when-not (compare-and-set! *rtc-lock nil true)
  177. (let [e (ex-info "Must not run multiple rtc-loops, try later"
  178. {:type :rtc.exception/lock-failed
  179. :missionary/retry true})]
  180. (started-dfv e)
  181. (throw e)))
  182. (try
  183. (m/? task)
  184. (finally
  185. (reset! *rtc-lock nil)))))
  186. (declare new-task--inject-users-info)
  187. (defn- create-rtc-loop
  188. "Return a map with [:rtc-state-flow :rtc-loop-task :*rtc-auto-push? :onstarted-task]
  189. TODO: auto refresh token if needed"
  190. [graph-uuid schema-version repo conn date-formatter token
  191. & {:keys [auto-push? debug-ws-url] :or {auto-push? true}}]
  192. (let [major-schema-version (db-schema/major-version schema-version)
  193. ws-url (or debug-ws-url (ws-util/get-ws-url token))
  194. *auto-push? (atom auto-push?)
  195. *remote-profile? (atom false)
  196. *last-calibrate-t (atom nil)
  197. *online-users (atom nil)
  198. *assets-sync-loop-canceler (atom nil)
  199. *server-schema-version (atom nil)
  200. started-dfv (m/dfv)
  201. add-log-fn (fn [type message]
  202. (assert (map? message) message)
  203. (rtc-log-and-state/rtc-log type (assoc message :graph-uuid graph-uuid)))
  204. {:keys [*current-ws get-ws-create-task]}
  205. (gen-get-ws-create-map--memoized ws-url)
  206. get-ws-create-task (r.client/ensure-register-graph-updates
  207. get-ws-create-task graph-uuid major-schema-version
  208. repo conn *last-calibrate-t *online-users *server-schema-version add-log-fn)
  209. {:keys [assets-sync-loop-task]}
  210. (r.asset/create-assets-sync-loop repo get-ws-create-task graph-uuid major-schema-version conn *auto-push?)
  211. mixed-flow (create-mixed-flow repo get-ws-create-task *auto-push? *online-users)]
  212. (assert (some? *current-ws))
  213. {:rtc-state-flow (create-rtc-state-flow (create-ws-state-flow *current-ws))
  214. :*rtc-auto-push? *auto-push?
  215. :*rtc-remote-profile? *remote-profile?
  216. :*online-users *online-users
  217. :onstarted-task started-dfv
  218. :rtc-loop-task
  219. (holding-rtc-lock
  220. started-dfv
  221. (m/sp
  222. (try
  223. ;; init run to open a ws
  224. (m/? get-ws-create-task)
  225. (started-dfv true)
  226. (update-remote-schema-version! conn @*server-schema-version)
  227. (add-migration-client-ops! repo @conn @*server-schema-version)
  228. (reset! *assets-sync-loop-canceler
  229. (c.m/run-task assets-sync-loop-task :assets-sync-loop-task))
  230. (->>
  231. (let [event (m/?> mixed-flow)]
  232. (case (:type event)
  233. :remote-update
  234. (try (r.remote-update/apply-remote-update graph-uuid repo conn date-formatter event add-log-fn)
  235. (catch :default e
  236. (when (= ::r.remote-update/need-pull-remote-data (:type (ex-data e)))
  237. (m/? (r.client/new-task--pull-remote-data
  238. repo conn graph-uuid major-schema-version date-formatter get-ws-create-task add-log-fn)))))
  239. :remote-asset-update
  240. (m/? (r.asset/new-task--emit-remote-asset-updates-from-push-asset-upload-updates
  241. repo @conn (:value event)))
  242. :local-update-check
  243. (m/? (r.client/new-task--push-local-ops
  244. repo conn graph-uuid major-schema-version date-formatter
  245. get-ws-create-task *remote-profile? add-log-fn))
  246. :online-users-updated
  247. (reset! *online-users (:online-users (:value event)))
  248. :pull-remote-updates
  249. (m/? (r.client/new-task--pull-remote-data
  250. repo conn graph-uuid major-schema-version date-formatter get-ws-create-task add-log-fn))
  251. :inject-users-info
  252. (m/? (new-task--inject-users-info token graph-uuid major-schema-version))))
  253. (m/ap)
  254. (m/reduce {} nil)
  255. (m/?))
  256. (catch Cancelled e
  257. (add-log-fn :rtc.log/cancelled {})
  258. (throw e))
  259. (finally
  260. (started-dfv :final) ;; ensure started-dfv can recv a value(values except the first one will be disregarded)
  261. (when @*assets-sync-loop-canceler (@*assets-sync-loop-canceler))))))}))
  262. (def ^:private empty-rtc-loop-metadata
  263. {:repo nil
  264. :graph-uuid nil
  265. :local-graph-schema-version nil
  266. :remote-graph-schema-version nil
  267. :user-uuid nil
  268. :rtc-state-flow nil
  269. :*rtc-auto-push? nil
  270. :*rtc-remote-profile? nil
  271. :*online-users nil
  272. :*rtc-lock nil
  273. :canceler nil
  274. :*last-stop-exception nil})
  275. (defonce ^:private *rtc-loop-metadata (atom empty-rtc-loop-metadata
  276. :validator
  277. (fn [v] (= (set (keys empty-rtc-loop-metadata))
  278. (set (keys v))))))
  279. (defn- validate-rtc-start-conditions
  280. "Return exception if validation failed"
  281. [repo token]
  282. (if-let [conn (worker-state/get-datascript-conn repo)]
  283. (let [user-uuid (:sub (worker-util/parse-jwt token))
  284. graph-uuid (ldb/get-graph-rtc-uuid @conn)
  285. schema-version (ldb/get-graph-schema-version @conn)
  286. remote-schema-version (ldb/get-graph-remote-schema-version @conn)
  287. app-schema-version db-schema/version]
  288. (cond
  289. (not user-uuid)
  290. (ex-info "Invalid token" {:type :rtc.exception/invalid-token})
  291. (not graph-uuid)
  292. r.ex/ex-local-not-rtc-graph
  293. (not schema-version)
  294. (ex-info "Not found schema-version" {:type :rtc.exception/not-found-schema-version})
  295. (not remote-schema-version)
  296. (ex-info "Not found remote-schema-version" {:type :rtc.exception/not-found-remote-schema-version})
  297. (apply not= (map db-schema/major-version [app-schema-version remote-schema-version schema-version]))
  298. (ex-info "major schema version mismatch" {:type :rtc.exception/major-schema-version-mismatched
  299. :sub-type
  300. (r.branch-graph/compare-schemas
  301. remote-schema-version app-schema-version schema-version)
  302. :app app-schema-version
  303. :local schema-version
  304. :remote remote-schema-version})
  305. :else
  306. {:conn conn
  307. :user-uuid user-uuid
  308. :graph-uuid graph-uuid
  309. :schema-version schema-version
  310. :remote-schema-version remote-schema-version
  311. :date-formatter (common-config/get-date-formatter (worker-state/get-config repo))}))
  312. (ex-info "Not found db-conn" {:type :rtc.exception/not-found-db-conn
  313. :repo repo})))
  314. ;;; ================ API ================
  315. (defn new-task--rtc-start
  316. [repo token]
  317. (m/sp
  318. ;; ensure device metadata existing first
  319. (m/? (worker-device/new-task--ensure-device-metadata! token))
  320. (let [{:keys [conn user-uuid graph-uuid schema-version remote-schema-version date-formatter] :as r}
  321. (validate-rtc-start-conditions repo token)]
  322. (if (instance? ExceptionInfo r)
  323. (do (log/info :e r) (r.ex/->map r))
  324. (let [{:keys [rtc-state-flow *rtc-auto-push? *rtc-remote-profile? rtc-loop-task *online-users onstarted-task]}
  325. (create-rtc-loop graph-uuid schema-version repo conn date-formatter token)
  326. *last-stop-exception (atom nil)
  327. canceler (c.m/run-task rtc-loop-task :rtc-loop-task
  328. :fail (fn [e]
  329. (reset! *last-stop-exception e)
  330. (log/info :rtc-loop-task e)))
  331. start-ex (m/? onstarted-task)]
  332. (if-let [start-ex (:ex-data start-ex)]
  333. (do (log/info :start-ex start-ex) (r.ex/->map start-ex))
  334. (do (reset! *rtc-loop-metadata {:repo repo
  335. :graph-uuid graph-uuid
  336. :local-graph-schema-version schema-version
  337. :remote-graph-schema-version remote-schema-version
  338. :user-uuid user-uuid
  339. :rtc-state-flow rtc-state-flow
  340. :*rtc-auto-push? *rtc-auto-push?
  341. :*rtc-remote-profile? *rtc-remote-profile?
  342. :*online-users *online-users
  343. :*rtc-lock *rtc-lock
  344. :canceler canceler
  345. :*last-stop-exception *last-stop-exception})
  346. nil)))))))
  347. (defn rtc-stop
  348. []
  349. (when-let [canceler (:canceler @*rtc-loop-metadata)]
  350. (canceler)
  351. (reset! *rtc-loop-metadata empty-rtc-loop-metadata)))
  352. (defn rtc-toggle-auto-push
  353. []
  354. (when-let [*auto-push? (:*rtc-auto-push? @*rtc-loop-metadata)]
  355. (swap! *auto-push? not)))
  356. (defn rtc-toggle-remote-profile
  357. []
  358. (when-let [*rtc-remote-profile? (:*rtc-remote-profile? @*rtc-loop-metadata)]
  359. (swap! *rtc-remote-profile? not)))
  360. (defn new-task--get-graphs
  361. [token]
  362. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  363. (m/join :graphs
  364. (ws-util/send&recv get-ws-create-task {:action "list-graphs"}))))
  365. (defn new-task--delete-graph
  366. "Return a task that return true if succeed"
  367. [token graph-uuid schema-version]
  368. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  369. (m/sp
  370. (let [{:keys [ex-data]}
  371. (m/? (ws-util/send&recv get-ws-create-task
  372. {:action "delete-graph"
  373. :graph-uuid graph-uuid
  374. :schema-version (str schema-version)}))]
  375. (when ex-data (log/info ::delete-graph-failed {:graph-uuid graph-uuid :ex-data ex-data}))
  376. (boolean (nil? ex-data))))))
  377. (defn new-task--get-users-info
  378. "Return a task that return users-info about the graph."
  379. [token graph-uuid]
  380. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  381. (m/join :users
  382. (ws-util/send&recv get-ws-create-task
  383. {:action "get-users-info" :graph-uuid graph-uuid}))))
  384. (defn new-task--inject-users-info
  385. [token graph-uuid major-schema-version]
  386. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  387. (ws-util/send&recv get-ws-create-task
  388. {:action "inject-users-info"
  389. :graph-uuid graph-uuid
  390. :schema-version (str major-schema-version)})))
  391. (defn new-task--grant-access-to-others
  392. [token graph-uuid & {:keys [target-user-uuids target-user-emails]}]
  393. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  394. (ws-util/send&recv get-ws-create-task
  395. (cond-> {:action "grant-access"
  396. :graph-uuid graph-uuid}
  397. target-user-uuids (assoc :target-user-uuids target-user-uuids)
  398. target-user-emails (assoc :target-user-emails target-user-emails)))))
  399. (defn new-task--get-block-content-versions
  400. "Return a task that return map [:ex-data :ex-message :versions]"
  401. [token graph-uuid block-uuid]
  402. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  403. (m/join :versions (ws-util/send&recv get-ws-create-task
  404. {:action "query-block-content-versions"
  405. :block-uuids [block-uuid]
  406. :graph-uuid graph-uuid}))))
  407. (def ^:private create-get-state-flow*
  408. (let [rtc-loop-metadata-flow (m/watch *rtc-loop-metadata)]
  409. (m/ap
  410. (let [{rtc-lock :*rtc-lock
  411. :keys [repo graph-uuid local-graph-schema-version remote-graph-schema-version
  412. user-uuid rtc-state-flow *rtc-auto-push? *rtc-remote-profile?
  413. *online-users *last-stop-exception]}
  414. (m/?< rtc-loop-metadata-flow)]
  415. (try
  416. (when (and repo rtc-state-flow *rtc-auto-push? rtc-lock)
  417. (m/?<
  418. (m/latest
  419. (fn [rtc-state rtc-auto-push? rtc-remote-profile?
  420. rtc-lock online-users pending-local-ops-count local-tx remote-tx]
  421. {:graph-uuid graph-uuid
  422. :local-graph-schema-version (db-schema/schema-version->string local-graph-schema-version)
  423. :remote-graph-schema-version (db-schema/schema-version->string remote-graph-schema-version)
  424. :user-uuid user-uuid
  425. :unpushed-block-update-count pending-local-ops-count
  426. :local-tx local-tx
  427. :remote-tx remote-tx
  428. :rtc-state rtc-state
  429. :rtc-lock rtc-lock
  430. :auto-push? rtc-auto-push?
  431. :remote-profile? rtc-remote-profile?
  432. :online-users online-users
  433. :last-stop-exception-ex-data (some-> *last-stop-exception deref ex-data)})
  434. rtc-state-flow
  435. (m/watch *rtc-auto-push?) (m/watch *rtc-remote-profile?)
  436. (m/watch rtc-lock) (m/watch *online-users)
  437. (client-op/create-pending-block-ops-count-flow repo)
  438. (rtc-log-and-state/create-local-t-flow graph-uuid)
  439. (rtc-log-and-state/create-remote-t-flow graph-uuid))))
  440. (catch Cancelled _))))))
  441. (def ^:private create-get-state-flow (c.m/throttle 300 create-get-state-flow*))
  442. (defn new-task--get-debug-state
  443. []
  444. (m/reduce {} nil (m/eduction (take 1) create-get-state-flow)))
  445. (defn new-task--upload-graph
  446. [token repo remote-graph-name]
  447. (let [{:keys [conn schema-version] :as r}
  448. (if-let [conn (worker-state/get-datascript-conn repo)]
  449. (if-let [schema-version (ldb/get-graph-schema-version @conn)]
  450. {:conn conn :schema-version schema-version}
  451. (ex-info "Not found schema-version" {:type :rtc.exception/not-found-schema-version}))
  452. (ex-info "Not found db-conn" {:type :rtc.exception/not-found-db-conn :repo repo}))]
  453. (m/sp
  454. (if (instance? ExceptionInfo r)
  455. (r.ex/->map r)
  456. (let [major-schema-version (db-schema/major-version schema-version)
  457. {:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  458. (m/? (r.upload-download/new-task--upload-graph
  459. get-ws-create-task repo conn remote-graph-name major-schema-version)))))))
  460. (defn new-task--branch-graph
  461. [token repo]
  462. (let [{:keys [conn graph-uuid schema-version] :as r}
  463. (if-let [conn (worker-state/get-datascript-conn repo)]
  464. (if-let [graph-uuid (ldb/get-graph-rtc-uuid @conn)]
  465. (if-let [schema-version (ldb/get-graph-schema-version @conn)]
  466. {:conn conn :graph-uuid graph-uuid :schema-version schema-version}
  467. (ex-info "Not found schema-version" {:type :rtc.exception/not-found-schema-version}))
  468. r.ex/ex-local-not-rtc-graph)
  469. (ex-info "Not found db-conn" {:type :rtc.exception/not-found-db-conn :repo repo}))]
  470. (m/sp
  471. (if (instance? ExceptionInfo r)
  472. (r.ex/->map r)
  473. (let [major-schema-version (db-schema/major-version schema-version)
  474. {:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  475. (m/? (r.upload-download/new-task--branch-graph
  476. get-ws-create-task repo conn graph-uuid major-schema-version)))))))
  477. (defn new-task--request-download-graph
  478. [token graph-uuid schema-version]
  479. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  480. (r.upload-download/new-task--request-download-graph get-ws-create-task graph-uuid schema-version)))
  481. (defn new-task--download-info-list
  482. [token graph-uuid schema-version]
  483. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  484. (r.upload-download/new-task--download-info-list get-ws-create-task graph-uuid schema-version)))
  485. (defn new-task--wait-download-info-ready
  486. [token download-info-uuid graph-uuid schema-version timeout-ms]
  487. (let [{:keys [get-ws-create-task]} (gen-get-ws-create-map--memoized (ws-util/get-ws-url token))]
  488. (r.upload-download/new-task--wait-download-info-ready
  489. get-ws-create-task download-info-uuid graph-uuid schema-version timeout-ms)))
  490. (def new-task--download-graph-from-s3 r.upload-download/new-task--download-graph-from-s3)
  491. (def-thread-api :rtc/start
  492. [repo token]
  493. (new-task--rtc-start repo token))
  494. (def-thread-api :rtc/stop
  495. []
  496. (rtc-stop))
  497. (def-thread-api :rtc/toggle-auto-push
  498. []
  499. (rtc-toggle-auto-push))
  500. (def-thread-api :rtc/toggle-remote-profile
  501. []
  502. (rtc-toggle-remote-profile))
  503. (def-thread-api :rtc/grant-graph-access
  504. [token graph-uuid target-user-uuids-str target-user-emails-str]
  505. (let [target-user-uuids (ldb/read-transit-str target-user-uuids-str)
  506. target-user-emails (ldb/read-transit-str target-user-emails-str)]
  507. (new-task--grant-access-to-others token graph-uuid
  508. :target-user-uuids target-user-uuids
  509. :target-user-emails target-user-emails)))
  510. (def-thread-api :rtc/get-graphs
  511. [token]
  512. (new-task--get-graphs token))
  513. (def-thread-api :rtc/delete-graph
  514. [token graph-uuid schema-version]
  515. (new-task--delete-graph token graph-uuid schema-version))
  516. (def-thread-api :rtc/get-users-info
  517. [token graph-uuid]
  518. (new-task--get-users-info token graph-uuid))
  519. (def-thread-api :rtc/get-block-content-versions
  520. [token graph-uuid block-uuid]
  521. (new-task--get-block-content-versions token graph-uuid block-uuid))
  522. (def-thread-api :rtc/get-debug-state
  523. []
  524. (new-task--get-debug-state))
  525. (def-thread-api :rtc/async-upload-graph
  526. [repo token remote-graph-name]
  527. (new-task--upload-graph token repo remote-graph-name))
  528. (def-thread-api :rtc/async-branch-graph
  529. [repo token]
  530. (new-task--branch-graph token repo))
  531. (def-thread-api :rtc/request-download-graph
  532. [token graph-uuid schema-version]
  533. (new-task--request-download-graph token graph-uuid schema-version))
  534. (def-thread-api :rtc/wait-download-graph-info-ready
  535. [token download-info-uuid graph-uuid schema-version timeout-ms]
  536. (new-task--wait-download-info-ready token download-info-uuid graph-uuid schema-version timeout-ms))
  537. (def-thread-api :rtc/download-graph-from-s3
  538. [graph-uuid graph-name s3-url]
  539. (new-task--download-graph-from-s3 graph-uuid graph-name s3-url))
  540. (def-thread-api :rtc/download-info-list
  541. [token graph-uuid schema-version]
  542. (new-task--download-info-list token graph-uuid schema-version))
  543. ;;; ================ API (ends) ================
  544. ;;; subscribe state ;;;
  545. (when-not common-config/PUBLISHING
  546. (c.m/run-background-task
  547. ::subscribe-state
  548. (m/reduce
  549. (fn [_ v] (worker-util/post-message :rtc-sync-state v))
  550. create-get-state-flow)))
  551. (comment
  552. (do
  553. (def user-uuid "7f41990d-2c8f-4f79-b231-88e9f652e072")
  554. (def graph-uuid "ff7186c1-5903-4bc8-b4e9-ca23525b9983")
  555. (def repo "logseq_db_4-23")
  556. (def conn (worker-state/get-datascript-conn repo))
  557. (def date-formatter "MMM do, yyyy")
  558. (def debug-ws-url "wss://ws-dev.logseq.com/rtc-sync?token=???")
  559. (let [{:keys [rtc-state-flow *rtc-auto-push? rtc-loop-task]}
  560. (create-rtc-loop user-uuid graph-uuid repo conn date-formatter nil {:debug-ws-url debug-ws-url})
  561. c (c.m/run-task rtc-loop-task :rtc-loop-task)]
  562. (def cancel c)
  563. (def rtc-state-flow rtc-state-flow)
  564. (def *rtc-auto-push? *rtc-auto-push?)))
  565. (cancel)
  566. (do
  567. (def a (atom 1))
  568. (def f1 (m/watch a))
  569. (def f2 (create-pull-remote-updates-flow 5000 f1))
  570. (def cancel (c.m/run-task (m/reduce (fn [_ v] (prn :v v)) f2) :xxx)))
  571. (defn sleep-emit [delays]
  572. (m/ap (let [n (m/?> (m/seed delays))
  573. r (m/? (m/sleep n n))]
  574. (prn :xxx r (t/now))
  575. r)))
  576. (def cancel
  577. ((->> (m/sample vector
  578. (m/latest identity (m/reductions {} 0 (sleep-emit [1000 1 2])))
  579. (sleep-emit [2000 3000 1000]))
  580. (m/reduce (fn [_ v] (prn :v v)))) prn prn))
  581. (let [f (m/stream (m/ap (m/amb 1 2 3 4)))]
  582. ((m/reduce (fn [r v] (conj r v)) (m/reductions {} :xxx f)) prn prn)
  583. ((m/reduce (fn [r v] (conj r v)) f) prn prn)))