1
0

core.cljs 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865
  1. (ns frontend.db.rtc.core
  2. "Main ns for rtc related fns"
  3. (:require-macros
  4. [frontend.db.rtc.macro :refer [with-sub-data-from-ws get-req-id get-result-ch]])
  5. (:require [cljs-time.coerce :as tc]
  6. [cljs-time.core :as t]
  7. [cljs.core.async :as async :refer [<! >! chan go go-loop]]
  8. [clojure.set :as set]
  9. [cognitect.transit :as transit]
  10. [frontend.async-util :include-macros true :refer [<?]]
  11. [frontend.db :as db]
  12. [frontend.db.react :as react]
  13. [frontend.db.rtc.const :as rtc-const]
  14. [frontend.db.rtc.op-mem-layer :as op-mem-layer]
  15. [frontend.db.rtc.ws :as ws]
  16. [frontend.handler.page :as page-handler]
  17. [frontend.handler.property.util :as pu]
  18. [frontend.handler.user :as user]
  19. [frontend.handler.whiteboard :as whiteboard-handler]
  20. [frontend.modules.outliner.core :as outliner-core]
  21. [frontend.modules.outliner.transaction :as outliner-tx]
  22. [frontend.state :as state]
  23. [frontend.util :as util]
  24. [malli.core :as m]
  25. [malli.util :as mu]))
  26. ;; +-------------+
  27. ;; | |
  28. ;; | server |
  29. ;; | |
  30. ;; +----^----+---+
  31. ;; | |
  32. ;; | |
  33. ;; | rtc-const/data-from-ws-schema
  34. ;; | |
  35. ;; rtc-const/data-to-ws-schema |
  36. ;; | |
  37. ;; | |
  38. ;; | |
  39. ;; +----+----v---+ +------------+
  40. ;; | +---------------------> |
  41. ;; | client | | indexeddb |
  42. ;; | |<--------------------+ |
  43. ;; +-------------+ +------------+
  44. ;; frontend.db.rtc.op/op-schema
  45. (def state-schema
  46. "
  47. | :*graph-uuid | atom of graph-uuid syncing now |
  48. | :*repo | atom of repo name syncing now |
  49. | :data-from-ws-chan | channel for receive messages from server websocket |
  50. | :data-from-ws-pub | pub of :data-from-ws-chan, dispatch by :req-id |
  51. | :*stop-rtc-loop-chan | atom of chan to stop <loop-for-rtc |
  52. | :*ws | atom of websocket |
  53. | :*rtc-state | atom of state of current rtc progress |
  54. | :toggle-auto-push-client-ops-chan | channel to toggle pushing client ops automatically |
  55. | :*auto-push-client-ops? | atom to show if it's push client-ops automatically |
  56. | :force-push-client-ops-chan | chan used to force push client-ops |
  57. "
  58. [:map {:closed true}
  59. [:*graph-uuid :any]
  60. [:*repo :any]
  61. [:data-from-ws-chan :any]
  62. [:data-from-ws-pub :any]
  63. [:*stop-rtc-loop-chan :any]
  64. [:*ws :any]
  65. [:*rtc-state :any]
  66. [:toggle-auto-push-client-ops-chan :any]
  67. [:*auto-push-client-ops? :any]
  68. [:force-push-client-ops-chan :any]])
  69. (def state-validator
  70. (let [validator (m/validator state-schema)]
  71. (fn [data]
  72. (if (validator data)
  73. true
  74. (prn (mu/explain-data state-schema data))))))
  75. (def rtc-state-schema
  76. [:enum :open :closed])
  77. (def rtc-state-validator (m/validator rtc-state-schema))
  78. (def transit-w (transit/writer :json))
  79. (def transit-r (transit/reader :json))
  80. (defmulti transact-db! (fn [action & _args] action))
  81. (defmethod transact-db! :delete-blocks [_ & args]
  82. (outliner-tx/transact!
  83. {:persist-op? false}
  84. (apply outliner-core/delete-blocks! args)))
  85. (defmethod transact-db! :move-blocks [_ & args]
  86. (outliner-tx/transact!
  87. {:persist-op? false}
  88. (apply outliner-core/move-blocks! args)))
  89. (defmethod transact-db! :insert-blocks [_ & args]
  90. (outliner-tx/transact!
  91. {:persist-op? false}
  92. (apply outliner-core/insert-blocks! args)))
  93. (defmethod transact-db! :save-block [_ & args]
  94. (outliner-tx/transact!
  95. {:persist-op? false}
  96. (apply outliner-core/save-block! args)))
  97. (defmethod transact-db! :delete-whiteboard-blocks [_ repo block-uuids]
  98. (db/transact! repo
  99. (mapv (fn [block-uuid] [:db/retractEntity [:block/uuid block-uuid]]) block-uuids)
  100. {:persist-op? false}))
  101. (defmethod transact-db! :upsert-whiteboard-block [_ repo blocks]
  102. (db/transact! repo blocks {:persist-op? false}))
  103. (defmethod transact-db! :raw [_ & args]
  104. (apply db/transact! args))
  105. (defn- whiteboard-page-block?
  106. [block]
  107. (contains? (set (:block/type block)) "whiteboard"))
  108. (defn- group-remote-remove-ops-by-whiteboard-block
  109. "return {true [<whiteboard-block-ops>], false [<other-ops>]}"
  110. [repo remote-remove-ops]
  111. (group-by (fn [{:keys [block-uuid]}]
  112. (boolean
  113. (when-let [block (db/pull repo [{:block/parent [:block/type]}] [:block/uuid block-uuid])]
  114. (whiteboard-page-block? (:block/parent block)))))
  115. remote-remove-ops))
  116. (defn apply-remote-remove-ops
  117. [repo remove-ops]
  118. (prn :remove-ops remove-ops)
  119. (let [{whiteboard-block-ops true other-ops false} (group-remote-remove-ops-by-whiteboard-block repo remove-ops)]
  120. (transact-db! :delete-whiteboard-blocks (map :block-uuid whiteboard-block-ops))
  121. (doseq [op other-ops]
  122. (when-let [block (db/pull repo '[*] [:block/uuid (:block-uuid op)])]
  123. (transact-db! :delete-blocks [block] {:children? false})
  124. (prn :apply-remote-remove-ops (:block-uuid op))))))
  125. (defn- insert-or-move-block
  126. [repo block-uuid remote-parents remote-left-uuid move? op-value]
  127. (when (and (seq remote-parents) remote-left-uuid)
  128. (let [first-remote-parent (first remote-parents)
  129. local-parent (db/pull repo '[*] [:block/uuid first-remote-parent])
  130. whiteboard-page-block? (whiteboard-page-block? local-parent)
  131. ;; when insert blocks in whiteboard, local-left is ignored
  132. local-left (when-not whiteboard-page-block? (db/pull repo '[*] [:block/uuid remote-left-uuid]))
  133. b {:block/uuid block-uuid}
  134. ;; b-ent (db/entity repo [:block/uuid (uuid block-uuid-str)])
  135. ]
  136. (case [whiteboard-page-block? (some? local-parent) (some? local-left)]
  137. [false false true]
  138. (if move?
  139. (transact-db! :move-blocks [b] local-left true)
  140. (transact-db! :insert-blocks
  141. [{:block/uuid block-uuid
  142. :block/content ""
  143. :block/format :markdown}]
  144. local-left {:sibling? true :keep-uuid? true}))
  145. [false true true]
  146. (let [sibling? (not= (:block/uuid local-parent) (:block/uuid local-left))]
  147. (if move?
  148. (transact-db! :move-blocks [b] local-left sibling?)
  149. (transact-db! :insert-blocks
  150. [{:block/uuid block-uuid :block/content ""
  151. :block/format :markdown}]
  152. local-left {:sibling? sibling? :keep-uuid? true})))
  153. [false true false]
  154. (if move?
  155. (transact-db! :move-blocks [b] local-parent false)
  156. (transact-db! :insert-blocks
  157. [{:block/uuid block-uuid :block/content ""
  158. :block/format :markdown}]
  159. local-parent {:sibling? false :keep-uuid? true}))
  160. ;; Don't need to insert-whiteboard-block here,
  161. ;; will do :upsert-whiteboard-block in `update-block-attrs`
  162. [true true false]
  163. (when (nil? (:properties op-value))
  164. ;; when :properties is nil, this block should be treat as normal block
  165. (if move?
  166. (transact-db! :move-blocks [b] local-parent false)
  167. (transact-db! :insert-blocks [{:block/uuid block-uuid :block/content "" :block/format :markdown}]
  168. local-parent {:sibling? false :keep-uuid? true})))
  169. [true true true]
  170. (when (nil? (:properties op-value))
  171. (let [sibling? (not= (:block/uuid local-parent) (:block/uuid local-left))]
  172. (if move?
  173. (transact-db! :move-blocks [b] local-left sibling?)
  174. (transact-db! :insert-blocks [{:block/uuid block-uuid :block/content "" :block/format :markdown}]
  175. local-left {:sibling? sibling? :keep-uuid? true}))))
  176. (throw (ex-info "Don't know where to insert" {:block-uuid block-uuid :remote-parents remote-parents
  177. :remote-left remote-left-uuid}))))))
  178. (defn- move-ops-map->sorted-move-ops
  179. [move-ops-map]
  180. (let [uuid->dep-uuids (into {} (map (fn [[uuid env]] [uuid (set (conj (:parents env) (:left env)))]) move-ops-map))
  181. all-uuids (set (keys move-ops-map))
  182. sorted-uuids
  183. (loop [r []
  184. rest-uuids all-uuids
  185. uuid (first rest-uuids)]
  186. (if-not uuid
  187. r
  188. (let [dep-uuids (uuid->dep-uuids uuid)]
  189. (if-let [next-uuid (first (set/intersection dep-uuids rest-uuids))]
  190. (recur r rest-uuids next-uuid)
  191. (let [rest-uuids* (disj rest-uuids uuid)]
  192. (recur (conj r uuid) rest-uuids* (first rest-uuids*)))))))]
  193. (mapv move-ops-map sorted-uuids)))
  194. (comment
  195. (def move-ops-map {"2" {:parents ["1"] :left "1" :x "2"}
  196. "1" {:parents ["3"] :left nil :x "1"}
  197. "3" {:parents [] :left nil :x "3"}})
  198. (move-ops-map->sorted-move-ops move-ops-map))
  199. (defn- check-block-pos
  200. "NOTE: some blocks don't have :block/left (e.g. whiteboard blocks)"
  201. [repo block-uuid remote-parents remote-left-uuid]
  202. (let [local-b (db/pull repo '[{:block/left [:block/uuid]}
  203. {:block/parent [:block/uuid]}
  204. *]
  205. [:block/uuid block-uuid])
  206. remote-parent-uuid (first remote-parents)]
  207. (cond
  208. (nil? local-b)
  209. :not-exist
  210. (and (nil? (:block/left local-b))
  211. (not= (:block/uuid (:block/parent local-b)) remote-parent-uuid))
  212. ;; blocks don't have :block/left
  213. :wrong-pos
  214. (and (:block/left local-b)
  215. (or (not= (:block/uuid (:block/parent local-b)) remote-parent-uuid)
  216. (not= (:block/uuid (:block/left local-b)) remote-left-uuid)))
  217. :wrong-pos
  218. :else nil)))
  219. (defn- upsert-whiteboard-block
  220. [repo {:keys [parents properties] :as _op-value}]
  221. (let [first-remote-parent (first parents)]
  222. (when-let [local-parent (db/pull repo '[*] [:block/uuid first-remote-parent])]
  223. (let [page-name (:block/name local-parent)
  224. properties* (transit/read transit-r properties)
  225. shape-property-id (pu/get-pid :logseq.tldraw.shape)
  226. shape (and (map? properties*)
  227. (get properties* shape-property-id))]
  228. (assert (some? page-name) local-parent)
  229. (assert (some? shape) properties*)
  230. (transact-db! :upsert-whiteboard-block repo [(whiteboard-handler/shape->block shape page-name)])))))
  231. (defn- update-block-attrs
  232. [repo block-uuid {:keys [parents properties content] :as op-value}]
  233. (let [key-set (set/intersection
  234. (conj rtc-const/general-attr-set :content)
  235. (set (keys op-value)))]
  236. (when (seq key-set)
  237. (let [first-remote-parent (first parents)
  238. local-parent (db/pull repo '[*] [:block/uuid first-remote-parent])
  239. whiteboard-page-block? (whiteboard-page-block? local-parent)]
  240. (cond
  241. (and whiteboard-page-block? properties)
  242. (upsert-whiteboard-block repo op-value)
  243. :else
  244. (let [b-ent (db/pull repo '[*] [:block/uuid block-uuid])
  245. new-block
  246. (cond-> (db/pull repo '[*] (:db/id b-ent))
  247. (and (contains? key-set :content)
  248. (not= (:content op-value)
  249. (:block/content b-ent))) (assoc :block/content (:content op-value))
  250. (contains? key-set :updated-at) (assoc :block/updated-at (:updated-at op-value))
  251. (contains? key-set :created-at) (assoc :block/created-at (:created-at op-value))
  252. (contains? key-set :alias) (assoc :block/alias (some->> (seq (:alias op-value))
  253. (map (partial vector :block/uuid))
  254. (db/pull-many repo [:db/id])
  255. (keep :db/id)))
  256. (contains? key-set :type) (assoc :block/type (:type op-value))
  257. (contains? key-set :schema) (assoc :block/schema (transit/read transit-r (:schema op-value)))
  258. (contains? key-set :tags) (assoc :block/tags (some->> (seq (:tags op-value))
  259. (map (partial vector :block/uuid))
  260. (db/pull-many repo [:db/id])
  261. (filter :db/id)))
  262. ;; FIXME: it looks save-block won't save :block/properties??
  263. ;; so I need to transact properties myself
  264. ;; (contains? key-set :properties) (assoc :block/properties
  265. ;; (transit/read transit-r (:properties op-value)))
  266. )]
  267. (transact-db! :save-block new-block)
  268. (let [properties (transit/read transit-r (:properties op-value))]
  269. (transact-db! :raw
  270. repo
  271. [{:block/uuid block-uuid
  272. :block/properties properties}]
  273. {:outliner-op :save-block}))))))))
  274. (defn apply-remote-move-ops
  275. [repo sorted-move-ops]
  276. (prn :sorted-move-ops sorted-move-ops)
  277. (doseq [{:keys [parents left self] :as op-value} sorted-move-ops]
  278. (let [r (check-block-pos repo self parents left)]
  279. (case r
  280. :not-exist
  281. (insert-or-move-block repo self parents left false op-value)
  282. :wrong-pos
  283. (insert-or-move-block repo self parents left true op-value)
  284. nil ; do nothing
  285. nil)
  286. (update-block-attrs repo self op-value)
  287. (prn :apply-remote-move-ops self r parents left))))
  288. (defn apply-remote-update-ops
  289. [repo update-ops]
  290. (prn :update-ops update-ops)
  291. (doseq [{:keys [parents left self] :as op-value} update-ops]
  292. (when (and parents left)
  293. (let [r (check-block-pos repo self parents left)]
  294. (case r
  295. :not-exist
  296. (insert-or-move-block repo self parents left false op-value)
  297. :wrong-pos
  298. (insert-or-move-block repo self parents left true op-value)
  299. nil)))
  300. (update-block-attrs repo self op-value)
  301. (prn :apply-remote-update-ops self)))
  302. (defn apply-remote-update-page-ops
  303. [repo update-page-ops]
  304. (doseq [{:keys [self page-name original-name] :as op-value} update-page-ops]
  305. (let [old-page-original-name (:block/original-name
  306. (db/pull repo [:block/original-name] [:block/uuid self]))
  307. exist-page (db/pull repo [:block/uuid] [:block/name page-name])]
  308. (cond
  309. ;; same name but different uuid
  310. ;; remote page has same block/name as local's, but they don't have same block/uuid.
  311. ;; 1. rename local page's name to '<origin-name>-<ms-epoch>-Conflict'
  312. ;; 2. create page, name=<origin-name>, uuid=remote-uuid
  313. (and exist-page (not= (:block/uuid exist-page) self))
  314. (do (page-handler/rename! original-name (util/format "%s-%s-CONFLICT" original-name (tc/to-long (t/now))))
  315. (page-handler/create! original-name {:redirect? false :create-first-block? false
  316. :uuid self :persist-op? false}))
  317. ;; a client-page has same uuid as remote but different page-names,
  318. ;; then we need to rename the client-page to remote-page-name
  319. (and old-page-original-name (not= old-page-original-name original-name))
  320. (page-handler/rename! old-page-original-name original-name false false)
  321. ;; no such page, name=remote-page-name, OR, uuid=remote-block-uuid
  322. ;; just create-page
  323. :else
  324. (page-handler/create! original-name {:redirect? false :create-first-block? false
  325. :uuid self :persist-op? false}))
  326. (update-block-attrs repo self op-value))))
  327. (defn apply-remote-remove-page-ops
  328. [repo remove-page-ops]
  329. (doseq [op remove-page-ops]
  330. (when-let [page-name (:block/name
  331. (db/pull repo [:block/name] [:block/uuid (:block-uuid op)]))]
  332. (page-handler/delete! page-name nil {:redirect-to-home? false :persist-op? false}))))
  333. (defn filter-remote-data-by-local-unpushed-ops
  334. "when remote-data request client to move/update/remove/... blocks,
  335. these updates maybe not needed, because this client just updated some of these blocks,
  336. so we need to filter these just-updated blocks out, according to the unpushed-local-ops"
  337. [affected-blocks-map local-unpushed-ops]
  338. ;; (assert (op-mem-layer/ops-coercer local-unpushed-ops) local-unpushed-ops)
  339. (reduce
  340. (fn [affected-blocks-map local-op]
  341. (case (first local-op)
  342. "move"
  343. (let [block-uuid (:block-uuid (second local-op))
  344. remote-op (get affected-blocks-map block-uuid)]
  345. (case (:op remote-op)
  346. :remove (dissoc affected-blocks-map (:block-uuid remote-op))
  347. :move (dissoc affected-blocks-map (:self remote-op))
  348. ;; default
  349. affected-blocks-map))
  350. "update"
  351. (let [block-uuid (:block-uuid (second local-op))
  352. local-updated-attr-set (set (keys (:updated-attrs (second local-op))))]
  353. (if-let [remote-op (get affected-blocks-map block-uuid)]
  354. (assoc affected-blocks-map block-uuid
  355. (if (#{:update-attrs :move} (:op remote-op))
  356. (apply dissoc remote-op local-updated-attr-set)
  357. remote-op))
  358. affected-blocks-map))
  359. ;;else
  360. affected-blocks-map))
  361. affected-blocks-map local-unpushed-ops))
  362. (defn <apply-remote-data
  363. [repo data-from-ws]
  364. (assert (rtc-const/data-from-ws-validator data-from-ws) data-from-ws)
  365. (go
  366. (let [remote-t (:t data-from-ws)
  367. remote-t-before (:t-before data-from-ws)
  368. local-tx (op-mem-layer/get-local-tx repo)]
  369. (cond
  370. (not (and (pos? remote-t)
  371. (pos? remote-t-before)))
  372. (throw (ex-info "invalid remote-data" {:data data-from-ws}))
  373. (<= remote-t local-tx)
  374. (prn ::skip :remote-t remote-t :remote-t remote-t-before :local-t local-tx)
  375. (< local-tx remote-t-before)
  376. (do (prn ::need-pull-remote-data :remote-t remote-t :remote-t-before remote-t-before :local-t local-tx)
  377. ::need-pull-remote-data)
  378. (<= remote-t-before local-tx remote-t)
  379. (let [affected-blocks-map (:affected-blocks data-from-ws)
  380. unpushed-ops (op-mem-layer/get-all-ops repo)
  381. affected-blocks-map* (if unpushed-ops
  382. (filter-remote-data-by-local-unpushed-ops
  383. affected-blocks-map unpushed-ops)
  384. affected-blocks-map)
  385. {remove-ops-map :remove move-ops-map :move update-ops-map :update-attrs
  386. update-page-ops-map :update-page remove-page-ops-map :remove-page}
  387. (update-vals
  388. (group-by (fn [[_ env]] (get env :op)) affected-blocks-map*)
  389. (partial into {}))
  390. remove-ops (vals remove-ops-map)
  391. sorted-move-ops (move-ops-map->sorted-move-ops move-ops-map)
  392. update-ops (vals update-ops-map)
  393. update-page-ops (vals update-page-ops-map)
  394. remove-page-ops (vals remove-page-ops-map)]
  395. (state/set-state! [:rtc/remote-batch-tx-state repo]
  396. {:in-transaction? true
  397. :txs []})
  398. (util/profile :apply-remote-update-page-ops (apply-remote-update-page-ops repo update-page-ops))
  399. (util/profile :apply-remote-remove-ops (apply-remote-remove-ops repo remove-ops))
  400. (util/profile :apply-remote-move-ops (apply-remote-move-ops repo sorted-move-ops))
  401. (util/profile :apply-remote-update-ops (apply-remote-update-ops repo update-ops))
  402. (util/profile :apply-remote-remove-page-ops (apply-remote-remove-page-ops repo remove-page-ops))
  403. (let [txs (get-in @state/state [:rtc/remote-batch-tx-state repo :txs])]
  404. (util/profile
  405. :batch-refresh
  406. (react/batch-refresh! repo txs)))
  407. (op-mem-layer/update-local-tx! repo remote-t))
  408. :else (throw (ex-info "unreachable" {:remote-t remote-t
  409. :remote-t-before remote-t-before
  410. :local-t local-tx}))))))
  411. (defn- <push-data-from-ws-handler
  412. [repo push-data-from-ws]
  413. (prn :push-data-from-ws push-data-from-ws)
  414. (go
  415. (let [r (<! (<apply-remote-data repo push-data-from-ws))]
  416. (when (= r ::need-pull-remote-data)
  417. r))))
  418. (defn- remove-non-exist-block-uuids-in-add-retract-map
  419. [repo add-retract-map]
  420. (let [{:keys [add retract]} add-retract-map
  421. add* (->> add
  422. (map (fn [x] [:block/uuid x]))
  423. (db/pull-many repo [:block/uuid])
  424. (keep :block/uuid))]
  425. (cond-> {}
  426. (seq add*) (assoc :add add*)
  427. (seq retract) (assoc :retract retract))))
  428. (defmulti local-block-ops->remote-ops-aux (fn [tp & _] tp))
  429. (defmethod local-block-ops->remote-ops-aux :move-op
  430. [_ & {:keys [parent-uuid left-uuid block-uuid *remote-ops *depend-on-block-uuid-set]}]
  431. (when parent-uuid
  432. (let [target-uuid (or left-uuid parent-uuid)
  433. sibling? (not= left-uuid parent-uuid)]
  434. (swap! *remote-ops conj [:move {:block-uuid block-uuid :target-uuid target-uuid :sibling? sibling?}])
  435. (swap! *depend-on-block-uuid-set conj target-uuid))))
  436. (defmethod local-block-ops->remote-ops-aux :update-op
  437. [_ & {:keys [repo block update-op left-uuid parent-uuid *remote-ops]}]
  438. (let [block-uuid (:block/uuid block)
  439. attr-map (:updated-attrs (second update-op))
  440. attr-alias-map (when (contains? attr-map :alias)
  441. (remove-non-exist-block-uuids-in-add-retract-map repo (:alias attr-map)))
  442. attr-tags-map (when (contains? attr-map :tags)
  443. (remove-non-exist-block-uuids-in-add-retract-map repo (:tags attr-map)))
  444. attr-type-map (when (contains? attr-map :type)
  445. (let [{:keys [add retract]} (:type attr-map)
  446. current-type-value (set (:block/type block))
  447. add (set/intersection add current-type-value)
  448. retract (set/difference retract current-type-value)]
  449. (cond-> {}
  450. (seq add) (assoc :add add)
  451. (seq retract) (assoc :retract retract))))
  452. attr-properties-map (when (contains? attr-map :properties)
  453. (let [{:keys [add retract]} (:properties attr-map)
  454. properties (:block/properties block)
  455. add* (into []
  456. (update-vals (select-keys properties add)
  457. (partial transit/write transit-w)))]
  458. (cond-> {}
  459. (seq add*) (assoc :add add*)
  460. (seq retract) (assoc :retract retract))))
  461. target-uuid (or left-uuid parent-uuid)
  462. sibling? (not= left-uuid parent-uuid)]
  463. (swap! *remote-ops conj
  464. [:update
  465. (cond-> {:block-uuid block-uuid}
  466. (:block/updated-at block) (assoc :updated-at (:block/updated-at block))
  467. (:block/created-at block) (assoc :created-at (:block/created-at block))
  468. (contains? attr-map :schema) (assoc :schema
  469. (transit/write transit-w (:block/schema block)))
  470. attr-alias-map (assoc :alias attr-alias-map)
  471. attr-type-map (assoc :type attr-type-map)
  472. attr-tags-map (assoc :tags attr-tags-map)
  473. attr-properties-map (assoc :properties attr-properties-map)
  474. (and (contains? attr-map :content)
  475. (:block/content block))
  476. (assoc :content (:block/content block))
  477. (and (contains? attr-map :link)
  478. (:block/uuid (:block/link block)))
  479. (assoc :link (:block/uuid (:block/link block)))
  480. target-uuid (assoc :target-uuid target-uuid :sibling? sibling?))])))
  481. (defmethod local-block-ops->remote-ops-aux :update-page-op
  482. [_ & {:keys [repo block-uuid *remote-ops]}]
  483. (when-let [{page-name :block/name original-name :block/original-name}
  484. (db/pull repo [:block/name :block/original-name] [:block/uuid block-uuid])]
  485. (swap! *remote-ops conj
  486. [:update-page {:block-uuid block-uuid
  487. :page-name page-name
  488. :original-name (or original-name page-name)}])))
  489. (defmethod local-block-ops->remote-ops-aux :remove-op
  490. [_ & {:keys [repo remove-op *remote-ops]}]
  491. (when-let [block-uuid (:block-uuid (second remove-op))]
  492. (when (nil? (db/pull repo [:block/uuid] [:block/uuid block-uuid]))
  493. (swap! *remote-ops conj [:remove {:block-uuids [block-uuid]}]))))
  494. (defmethod local-block-ops->remote-ops-aux :remove-page-op
  495. [_ & {:keys [repo remove-page-op *remote-ops]}]
  496. (when-let [block-uuid (:block-uuid (second remove-page-op))]
  497. (when (nil? (db/pull repo [:block/uuid] [:block/uuid block-uuid]))
  498. (swap! *remote-ops conj [:remove-page {:block-uuid block-uuid}]))))
  499. (defn- local-block-ops->remote-ops
  500. [repo block-ops]
  501. (let [*depend-on-block-uuid-set (atom #{})
  502. *remote-ops (atom [])
  503. {move-op :move remove-op :remove update-op :update update-page-op :update-page remove-page-op :remove-page}
  504. block-ops]
  505. (when-let [block-uuid
  506. (some (comp :block-uuid second) [move-op update-op update-page-op])]
  507. (when-let [block (db/pull repo
  508. '[{:block/left [:block/uuid]}
  509. {:block/parent [:block/uuid]}
  510. {:block/link [:block/uuid]}
  511. *]
  512. [:block/uuid block-uuid])]
  513. (let [left-uuid (some-> block :block/left :block/uuid)
  514. parent-uuid (some-> block :block/parent :block/uuid)]
  515. (when parent-uuid ; whiteboard blocks don't have :block/left
  516. ;; remote-move-op
  517. (when move-op
  518. (local-block-ops->remote-ops-aux :move-op
  519. :parent-uuid parent-uuid
  520. :left-uuid left-uuid
  521. :block-uuid block-uuid
  522. :*remote-ops *remote-ops
  523. :*depend-on-block-uuid-set *depend-on-block-uuid-set)))
  524. ;; remote-update-op
  525. (when update-op
  526. (local-block-ops->remote-ops-aux :update-op
  527. :repo repo
  528. :block block
  529. :update-op update-op
  530. :parent-uuid parent-uuid
  531. :left-uuid left-uuid
  532. :*remote-ops *remote-ops)))
  533. ;; remote-update-page-op
  534. (when update-page-op
  535. (local-block-ops->remote-ops-aux :update-page-op
  536. :repo repo
  537. :block-uuid block-uuid
  538. :*remote-ops *remote-ops))))
  539. ;; remote-remove-op
  540. (when remove-op
  541. (local-block-ops->remote-ops-aux :remove-op
  542. :repo repo
  543. :remove-op remove-op
  544. :*remote-ops *remote-ops))
  545. ;; remote-remove-page-op
  546. (when remove-page-op
  547. (local-block-ops->remote-ops-aux :remove-page-op
  548. :repo repo
  549. :remove-page-op remove-page-op
  550. :*remote-ops *remote-ops))
  551. {:remote-ops @*remote-ops
  552. :depend-on-block-uuids @*depend-on-block-uuid-set}))
  553. (defn gen-block-uuid->remote-ops
  554. [repo & {:keys [n] :or {n 50}}]
  555. (loop [current-handling-block-ops nil
  556. current-handling-block-uuid nil
  557. depend-on-block-uuid-coll nil
  558. r {}]
  559. (cond
  560. (and (empty? current-handling-block-ops)
  561. (empty? depend-on-block-uuid-coll)
  562. (>= (count r) n))
  563. r
  564. (and (empty? current-handling-block-ops)
  565. (empty? depend-on-block-uuid-coll))
  566. (if-let [{min-epoch-block-ops :ops block-uuid :block-uuid} (op-mem-layer/get-min-epoch-block-ops repo)]
  567. (do (assert (not (contains? r block-uuid)) {:r r :block-uuid block-uuid})
  568. (op-mem-layer/remove-block-ops! repo block-uuid)
  569. (recur min-epoch-block-ops block-uuid depend-on-block-uuid-coll r))
  570. ;; finish
  571. r)
  572. (and (empty? current-handling-block-ops)
  573. (seq depend-on-block-uuid-coll))
  574. (let [[block-uuid & other-block-uuids] depend-on-block-uuid-coll
  575. block-ops (op-mem-layer/get-block-ops repo block-uuid)]
  576. (op-mem-layer/remove-block-ops! repo block-uuid)
  577. (recur block-ops block-uuid other-block-uuids r))
  578. (seq current-handling-block-ops)
  579. (let [{:keys [remote-ops depend-on-block-uuids]}
  580. (local-block-ops->remote-ops repo current-handling-block-ops)]
  581. (recur nil nil
  582. (set/union (set depend-on-block-uuid-coll)
  583. (op-mem-layer/intersection-block-uuids repo depend-on-block-uuids))
  584. (assoc r current-handling-block-uuid (into {} remote-ops)))))))
  585. (defn sort-remote-ops
  586. [block-uuid->remote-ops]
  587. (let [block-uuid->dep-uuid
  588. (into {}
  589. (keep (fn [[block-uuid remote-ops]]
  590. (when-let [move-op (get remote-ops :move)]
  591. [block-uuid (:target-uuid move-op)])))
  592. block-uuid->remote-ops)
  593. all-move-uuids (set (keys block-uuid->dep-uuid))
  594. sorted-uuids
  595. (loop [r []
  596. rest-uuids all-move-uuids
  597. uuid (first rest-uuids)]
  598. (if-not uuid
  599. r
  600. (let [dep-uuid (block-uuid->dep-uuid uuid)]
  601. (if-let [next-uuid (get rest-uuids dep-uuid)]
  602. (recur r rest-uuids next-uuid)
  603. (let [rest-uuids* (disj rest-uuids uuid)]
  604. (recur (conj r uuid) rest-uuids* (first rest-uuids*)))))))
  605. sorted-move-ops (keep
  606. (fn [block-uuid]
  607. (some->> (get-in block-uuid->remote-ops [block-uuid :move])
  608. (vector :move)))
  609. sorted-uuids)
  610. remove-ops (keep
  611. (fn [[_ remote-ops]]
  612. (some->> (:remove remote-ops) (vector :remove)))
  613. block-uuid->remote-ops)
  614. update-ops (keep
  615. (fn [[_ remote-ops]]
  616. (some->> (:update remote-ops) (vector :update)))
  617. block-uuid->remote-ops)
  618. update-page-ops (keep
  619. (fn [[_ remote-ops]]
  620. (some->> (:update-page remote-ops) (vector :update-page)))
  621. block-uuid->remote-ops)
  622. remove-page-ops (keep
  623. (fn [[_ remote-ops]]
  624. (some->> (:remove-page remote-ops) (vector :remove-page)))
  625. block-uuid->remote-ops)]
  626. (concat update-page-ops remove-ops sorted-move-ops update-ops remove-page-ops)))
  627. (defn- <client-op-update-handler
  628. [state]
  629. {:pre [(some? @(:*graph-uuid state))
  630. (some? @(:*repo state))]}
  631. (go
  632. (let [repo @(:*repo state)]
  633. (op-mem-layer/new-branch! repo)
  634. (try
  635. (let [ops-for-remote (sort-remote-ops (gen-block-uuid->remote-ops repo))
  636. local-tx (op-mem-layer/get-local-tx repo)
  637. r (<? (ws/<send&receive state {:action "apply-ops" :graph-uuid @(:*graph-uuid state)
  638. :ops ops-for-remote :t-before (or local-tx 1)}))]
  639. (if-let [remote-ex (:ex-data r)]
  640. (case (:type remote-ex)
  641. ;; conflict-update remote-graph, keep these local-pending-ops
  642. ;; and try to send ops later
  643. :graph-lock-failed
  644. (do (prn :graph-lock-failed)
  645. (op-mem-layer/rollback! repo)
  646. nil)
  647. ;; this case means something wrong in remote-graph data,
  648. ;; nothing to do at client-side
  649. :graph-lock-missing
  650. (do (prn :graph-lock-missing)
  651. (op-mem-layer/rollback! repo)
  652. nil)
  653. :get-s3-object-failed
  654. (do (prn ::get-s3-object-failed r)
  655. (op-mem-layer/rollback! repo)
  656. nil)
  657. ;; else
  658. (do (op-mem-layer/rollback! repo)
  659. (throw (ex-info "Unavailable" {:remote-ex remote-ex}))))
  660. (do (assert (pos? (:t r)) r)
  661. (op-mem-layer/commit! repo)
  662. (<! (<apply-remote-data repo r))
  663. (prn :<client-op-update-handler :t (:t r)))))
  664. (catch :default e
  665. (prn ::unknown-ex e)
  666. (op-mem-layer/rollback! repo)
  667. nil)))))
  668. (defn- make-push-client-ops-timeout-ch
  669. [repo never-timeout?]
  670. (if never-timeout?
  671. (chan)
  672. (go
  673. (<! (async/timeout 2000))
  674. (pos? (op-mem-layer/get-unpushed-block-update-count repo)))))
  675. (defn <loop-for-rtc
  676. [state graph-uuid repo & {:keys [loop-started-ch]}]
  677. {:pre [(state-validator state)
  678. (some? graph-uuid)
  679. (some? repo)]}
  680. (go
  681. (reset! (:*repo state) repo)
  682. (reset! (:*rtc-state state) :open)
  683. (let [{:keys [data-from-ws-pub _client-op-update-chan]} state
  684. push-data-from-ws-ch (chan (async/sliding-buffer 100) (map rtc-const/data-from-ws-coercer))
  685. stop-rtc-loop-chan (chan)
  686. *auto-push-client-ops? (:*auto-push-client-ops? state)
  687. force-push-client-ops-ch (:force-push-client-ops-chan state)
  688. toggle-auto-push-client-ops-ch (:toggle-auto-push-client-ops-chan state)]
  689. (reset! (:*stop-rtc-loop-chan state) stop-rtc-loop-chan)
  690. (<! (ws/<ensure-ws-open! state))
  691. (reset! (:*graph-uuid state) graph-uuid)
  692. (with-sub-data-from-ws state
  693. (<! (ws/<send! state {:action "register-graph-updates" :req-id (get-req-id) :graph-uuid graph-uuid}))
  694. (<! (get-result-ch)))
  695. (async/sub data-from-ws-pub "push-updates" push-data-from-ws-ch)
  696. (when loop-started-ch (async/close! loop-started-ch))
  697. (<! (go-loop [push-client-ops-ch
  698. (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))]
  699. (let [{:keys [push-data-from-ws client-op-update stop continue]}
  700. (async/alt!
  701. toggle-auto-push-client-ops-ch {:continue true}
  702. force-push-client-ops-ch {:client-op-update true}
  703. push-client-ops-ch ([v] (if (and @*auto-push-client-ops? (true? v))
  704. {:client-op-update true}
  705. {:continue true}))
  706. push-data-from-ws-ch ([v] {:push-data-from-ws v})
  707. stop-rtc-loop-chan {:stop true}
  708. :priority true)]
  709. (cond
  710. continue
  711. (recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?)))
  712. push-data-from-ws
  713. (let [r (<! (<push-data-from-ws-handler repo push-data-from-ws))]
  714. (when (= r ::need-pull-remote-data)
  715. ;; trigger a force push, which can pull remote-diff-data from local-t to remote-t
  716. (async/put! force-push-client-ops-ch true))
  717. (recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?))))
  718. client-op-update
  719. (let [maybe-exp (<! (user/<wrap-ensure-id&access-token
  720. (<! (<client-op-update-handler state))))]
  721. (if (= :expired-token (:anom (ex-data maybe-exp)))
  722. (prn ::<loop-for-rtc "quitting loop" maybe-exp)
  723. (recur (make-push-client-ops-timeout-ch repo (not @*auto-push-client-ops?)))))
  724. stop
  725. (do (ws/stop @(:*ws state))
  726. (reset! (:*rtc-state state) :closed))
  727. :else
  728. nil))))
  729. (async/unsub data-from-ws-pub "push-updates" push-data-from-ws-ch))))
  730. (defn <grant-graph-access-to-others
  731. [state graph-uuid & {:keys [target-user-uuids target-user-emails]}]
  732. (go
  733. (let [r (with-sub-data-from-ws state
  734. (<! (ws/<send! state (cond-> {:req-id (get-req-id)
  735. :action "grant-access"
  736. :graph-uuid graph-uuid}
  737. target-user-uuids (assoc :target-user-uuids target-user-uuids)
  738. target-user-emails (assoc :target-user-emails target-user-emails))))
  739. (<! (get-result-ch)))]
  740. (if-let [ex-message (:ex-message r)]
  741. (prn ::<grant-graph-access-to-others ex-message (:ex-data r))
  742. (prn ::<grant-graph-access-to-others :succ)))))
  743. (defn <toggle-auto-push-client-ops
  744. [state]
  745. (go
  746. (swap! (:*auto-push-client-ops? state) not)
  747. (>! (:toggle-auto-push-client-ops-chan state) true)))
  748. (defn <get-block-content-versions
  749. [state block-uuid]
  750. (go
  751. (when (some-> state :*graph-uuid deref)
  752. (with-sub-data-from-ws state
  753. (<! (ws/<send! state {:req-id (get-req-id)
  754. :action "query-block-content-versions"
  755. :block-uuids [block-uuid]
  756. :graph-uuid @(:*graph-uuid state)}))
  757. (let [{:keys [ex-message ex-data versions]} (<! (get-result-ch))]
  758. (if ex-message
  759. (prn ::<get-block-content-versions :ex-message ex-message :ex-data ex-data)
  760. versions))))))
  761. (defn init-state
  762. [ws data-from-ws-chan]
  763. ;; {:post [(m/validate state-schema %)]}
  764. {:*rtc-state (atom :closed :validator rtc-state-validator)
  765. :*graph-uuid (atom nil)
  766. :*repo (atom nil)
  767. :data-from-ws-chan data-from-ws-chan
  768. :data-from-ws-pub (async/pub data-from-ws-chan :req-id)
  769. :toggle-auto-push-client-ops-chan (chan (async/sliding-buffer 1))
  770. :*auto-push-client-ops? (atom true :validator boolean?)
  771. :*stop-rtc-loop-chan (atom nil)
  772. :force-push-client-ops-chan (chan (async/sliding-buffer 1))
  773. :*ws (atom ws)})
  774. (defn <init-state
  775. []
  776. (go
  777. (let [data-from-ws-chan (chan (async/sliding-buffer 100))
  778. ws-opened-ch (chan)]
  779. (<! (user/<wrap-ensure-id&access-token
  780. (let [token (state/get-auth-id-token)
  781. ws (ws/ws-listen token data-from-ws-chan ws-opened-ch)]
  782. (<! ws-opened-ch)
  783. (init-state ws data-from-ws-chan)))))))