core.cljs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. (ns frontend.db.rtc.core
  2. (:require-macros
  3. [frontend.db.rtc.macro :refer [with-sub-data-from-ws get-req-id get-result-ch]])
  4. (:require [datascript.core :as d]
  5. [frontend.db.conn :as conn]
  6. [frontend.util :as util]
  7. [frontend.config :as config]
  8. [cljs.core.async :as async :refer [<! >! chan go go-loop offer!
  9. poll! timeout]]
  10. [cljs.core.async.interop :refer [p->c]]
  11. [electron.ipc :as ipc]
  12. [malli.core :as m]
  13. [frontend.modules.outliner.transaction :as outliner-tx]
  14. [frontend.modules.outliner.core :as outliner-core]
  15. [frontend.db :as db]
  16. [frontend.db.rtc.ws :as ws]
  17. [clojure.set :as set]
  18. [frontend.state :as state]
  19. [frontend.db.rtc.op :as op]
  20. [frontend.db.rtc.full-upload-download-graph :as full-upload-download-graph]))
  21. (def state-schema
  22. "
  23. | :user-uuid | string |
  24. | :*graph-uuid | atom of graph-uuid syncing now |
  25. | :*repo | atom of repo name syncing now |
  26. | :data-from-ws-chan | channel for receive messages from server websocket |
  27. | :data-from-ws-pub | pub of :data-from-ws-chan, dispatch by :req-id |
  28. | :client-op-update-chan | channel to notify that there're some new operations |
  29. | :*stop-rtc-loop-chan | atom of chan to stop <loop-for-rtc |
  30. | :*ws | atom of websocket |
  31. "
  32. [:map
  33. [:user-uuid :string]
  34. [:*graph-uuid :any]
  35. [:*repo :any]
  36. [:data-from-ws-chan :any]
  37. [:data-from-ws-pub :any]
  38. [:client-op-update-chan :any]
  39. [:*stop-rtc-loop-chan :any]
  40. [:*ws :any]])
  41. (def state-validator (m/validator state-schema))
  42. (def data-from-ws-schema
  43. [:map
  44. [:req-id :string]
  45. [:t {:optional true} :int]
  46. [:affected-blocks {:optional true}
  47. [:map-of :keyword
  48. [:or
  49. [:map
  50. [:op [:= "move"]]
  51. [:parents [:sequential :string]]
  52. [:left [:maybe :string]]
  53. [:self :string]
  54. [:content {:optional true} :string]]
  55. [:map
  56. [:op [:= "remove"]]
  57. [:block-uuid :string]]
  58. [:map
  59. [:op [:= "update-attrs"]]
  60. [:parents [:sequential :string]]
  61. [:left [:maybe :string]]
  62. [:self :string]
  63. [:content {:optional true} :string]]]]]])
  64. (def data-from-ws-validator (m/validator data-from-ws-schema))
  65. ;; TODO: don't use outliner-core/delete-blocks loop to remove blocks,
  66. ;; it is suitable for operations from users(e.g. remove consecutive blocks),
  67. ;; but blocks in remove-ops are scattered, even maybe from different pages
  68. (defn apply-remote-remove-ops
  69. [state remove-ops]
  70. {:pre [(some? @(:*repo state))]}
  71. (let [repo @(:*repo state)]
  72. (prn :remove-ops remove-ops)
  73. (doseq [op remove-ops]
  74. (when-let [block (db/entity repo [:block/uuid (uuid (:block-uuid op))])]
  75. (outliner-tx/transact!
  76. {:persist-op? false}
  77. (outliner-core/delete-blocks! [block] {:children? false}))
  78. (prn :apply-remote-remove-ops (:block-uuid op))))))
  79. (defn <query-blocks-env
  80. [block-uuids]
  81. ;; TODO
  82. {}
  83. )
  84. (defn- insert-or-move-block
  85. [state block-uuid-str remote-parents remote-left-uuid-str content move?]
  86. {:pre [(some? @(:*repo state))]}
  87. (when (and (seq remote-parents) remote-left-uuid-str)
  88. (let [repo @(:*repo state)
  89. local-left (db/entity repo [:block/uuid (uuid remote-left-uuid-str)])
  90. first-remote-parent (first remote-parents)
  91. local-parent (db/entity repo [:block/uuid (uuid first-remote-parent)])
  92. b {:block/uuid (uuid block-uuid-str)}]
  93. (case [(some? local-parent) (some? local-left)]
  94. [false true]
  95. (prn (:tx-data
  96. (outliner-tx/transact!
  97. {:persist-op? false}
  98. (if move?
  99. (do (outliner-core/move-blocks! [b] local-left true)
  100. (when (and content (not= (:block/content b) content))
  101. (outliner-core/save-block! (assoc (db/pull repo '[*] [:block/uuid (uuid block-uuid-str)])
  102. :block/content content))))
  103. (outliner-core/insert-blocks! [{:block/uuid (uuid block-uuid-str) :block/content content :block/format :markdown}]
  104. local-left {:sibling? true :keep-uuid? true})))))
  105. [true true]
  106. (let [sibling? (not= (:block/uuid local-parent) (:block/uuid local-left))]
  107. (prn (:tx-data
  108. (outliner-tx/transact!
  109. {:persist-op? false}
  110. (if move?
  111. (do (outliner-core/move-blocks! [b] local-left sibling?)
  112. (when (and content (not= (:block/content b) content))
  113. (outliner-core/save-block! (assoc (db/pull repo '[*] [:block/uuid (uuid block-uuid-str)])
  114. :block/content content))))
  115. (outliner-core/insert-blocks! [{:block/uuid (uuid block-uuid-str) :block/content content
  116. :block/format :markdown}]
  117. local-left {:sibling? sibling? :keep-uuid? true}))))))
  118. [true false]
  119. (prn (:tx-data
  120. (outliner-tx/transact!
  121. {:persist-op? false}
  122. (if move?
  123. (do (outliner-core/move-blocks! [b] local-parent false)
  124. (when (and content (not= (:block/content b) content))
  125. (outliner-core/save-block! (assoc (db/pull repo '[*] [:block/uuid (uuid block-uuid-str)])
  126. :block/content content))))
  127. (outliner-core/insert-blocks! [{:block/uuid (uuid block-uuid-str) :block/content content
  128. :block/format :markdown}]
  129. local-parent {:sibling? false :keep-uuid? true}))))
  130. [false false])
  131. (throw (ex-info "Don't know where to insert" {:block-uuid block-uuid-str :remote-parents remote-parents
  132. :remote-left remote-left-uuid-str}))))))
  133. (defn- move-ops-map->sorted-move-ops
  134. [move-ops-map]
  135. (let [uuid->dep-uuids (into {} (map (fn [[uuid env]] [uuid (set (conj (:parents env) (:left env)))]) move-ops-map))
  136. all-uuids (set (keys move-ops-map))
  137. sorted-uuids
  138. (loop [r []
  139. rest-uuids all-uuids
  140. uuid (first rest-uuids)]
  141. (if-not uuid
  142. r
  143. (let [dep-uuids (uuid->dep-uuids uuid)]
  144. (if-let [next-uuid (first (set/intersection dep-uuids rest-uuids))]
  145. (recur r rest-uuids next-uuid)
  146. (let [rest-uuids* (disj rest-uuids uuid)]
  147. (recur (conj r uuid) rest-uuids* (first rest-uuids*)))))))]
  148. (mapv move-ops-map sorted-uuids)))
  149. (comment
  150. (def move-ops-map {"2" {:parents ["1"] :left "1" :x "2"}
  151. "1" {:parents ["3"] :left nil :x "1"}
  152. "3" {:parents [] :left nil :x "3"}})
  153. (move-ops-map->sorted-move-ops move-ops-map))
  154. (defn- check-block-pos
  155. [state block-uuid-str remote-parents remote-left-uuid-str]
  156. {:pre [(some? @(:*repo state))]}
  157. (let [repo @(:*repo state)
  158. local-b (db/entity repo [:block/uuid (uuid block-uuid-str)])
  159. remote-parent-uuid-str (first remote-parents)]
  160. (cond
  161. (nil? local-b)
  162. :not-exist
  163. (not (and (= (str (:block/uuid (:block/parent local-b))) remote-parent-uuid-str)
  164. (= (str (:block/uuid (:block/left local-b))) remote-left-uuid-str)))
  165. :wrong-pos
  166. :else nil)))
  167. (defn apply-remote-move-ops
  168. [state sorted-move-ops]
  169. (prn :sorted-move-ops sorted-move-ops)
  170. (doseq [{:keys [parents left self first-child sibling content]}
  171. sorted-move-ops]
  172. (case (check-block-pos state self parents left)
  173. :not-exist
  174. (insert-or-move-block state self parents left content false)
  175. :wrong-pos
  176. (insert-or-move-block state self parents left content true)
  177. nil ; do nothing
  178. nil)
  179. (prn :apply-remote-move-ops self)))
  180. (defn apply-remote-update-ops
  181. [state update-ops]
  182. {:pre [(some? @(:*repo state))]}
  183. (let [repo @(:*repo state)]
  184. (prn :update-ops update-ops)
  185. (doseq [{:keys [parents left self first-child sibling content]}
  186. update-ops]
  187. (let [r (check-block-pos state self parents left)]
  188. (case r
  189. :not-exist
  190. (insert-or-move-block state self parents left content false)
  191. :wrong-pos
  192. (insert-or-move-block state self parents left content true)
  193. nil
  194. (when content
  195. (prn (:tx-data
  196. (outliner-tx/transact!
  197. {:persist-op? false}
  198. (outliner-core/save-block! (merge (db/pull repo '[*] [:block/uuid (uuid self)])
  199. {:block/uuid (uuid self)
  200. :block/content content
  201. :block/format :markdown})))))))
  202. (prn :apply-remote-update-ops r self)))))
  203. (defn <apply-remote-data
  204. [state data-from-ws]
  205. {:pre [(data-from-ws-validator data-from-ws)
  206. (some? @(:*repo state))]}
  207. (go
  208. (let [affected-blocks-map (update-keys (:affected-blocks data-from-ws) name)
  209. remote-t (:t data-from-ws)
  210. {remove-ops-map "remove" move-ops-map "move" update-ops-map "update-attrs"}
  211. (update-vals
  212. (group-by (fn [[_ env]] (get env :op)) affected-blocks-map)
  213. (partial into {}))
  214. remove-ops (vals remove-ops-map)
  215. sorted-move-ops (move-ops-map->sorted-move-ops move-ops-map)
  216. update-ops (vals update-ops-map)]
  217. (prn :start-apply-remote-remove-ops)
  218. (apply-remote-remove-ops state remove-ops)
  219. (prn :start-apply-remote-move-ops)
  220. (apply-remote-move-ops state sorted-move-ops)
  221. (prn :start-apply-remote-update-ops)
  222. (apply-remote-update-ops state update-ops)
  223. (<! (p->c (op/<update-local-tx! @(:*repo state) remote-t))))))
  224. (defn- <push-data-from-ws-handler
  225. [state push-data-from-ws]
  226. (go (<! (<apply-remote-data state push-data-from-ws))
  227. (prn :push-data-from-ws push-data-from-ws)))
  228. (defn- client-ops->remote-ops
  229. [state ops]
  230. {:pre [(some? @(:*repo state))]}
  231. (let [repo @(:*repo state)
  232. [remove-block-uuids-set update-block-uuids-set move-block-uuids-set]
  233. (loop [[op & other-ops] ops
  234. remove-block-uuids #{}
  235. update-block-uuids #{}
  236. move-block-uuids #{}]
  237. (if-not op
  238. [remove-block-uuids update-block-uuids move-block-uuids]
  239. (case (first op)
  240. "move"
  241. (let [block-uuids (set (:block-uuids (second op)))
  242. move-block-uuids (set/union move-block-uuids block-uuids)
  243. remove-block-uuids (set/difference remove-block-uuids block-uuids)]
  244. (recur other-ops remove-block-uuids update-block-uuids move-block-uuids))
  245. "remove"
  246. (let [block-uuids (set (:block-uuids (second op)))
  247. move-block-uuids (set/difference move-block-uuids block-uuids)
  248. remove-block-uuids (set/union remove-block-uuids block-uuids)]
  249. (recur other-ops remove-block-uuids update-block-uuids move-block-uuids))
  250. "update"
  251. (let [block-uuid (:block-uuid (second op))
  252. update-block-uuids (conj update-block-uuids block-uuid)]
  253. (recur other-ops remove-block-uuids update-block-uuids move-block-uuids))
  254. (throw (ex-info "unknown op type" op)))))
  255. {move-ops "move" remove-ops "remove" _update-ops "update"} (group-by first ops)
  256. move-block-uuids (->> move-ops
  257. (keep (fn [op]
  258. (let [block-uuids (set (:block-uuids (second op)))]
  259. (seq (set/intersection move-block-uuids-set block-uuids)))))
  260. (apply concat))
  261. remove-block-uuids-groups (->> remove-ops
  262. (keep (fn [op]
  263. (let [block-uuids (set (:block-uuids (second op)))]
  264. (seq (set/intersection remove-block-uuids-set block-uuids))))))
  265. update-block-uuids (seq update-block-uuids-set)
  266. move-ops* (keep
  267. (fn [block-uuid]
  268. (when-let [block (db/entity repo [:block/uuid (uuid block-uuid)])]
  269. (let [left-uuid (some-> block :block/left :block/uuid str)
  270. parent-uuid (some-> block :block/parent :block/uuid str)]
  271. (when (and left-uuid parent-uuid)
  272. ["move"
  273. {:block-uuid block-uuid :target-uuid left-uuid :sibling? (not= left-uuid parent-uuid)}]))))
  274. move-block-uuids)
  275. remove-ops* (->> remove-block-uuids-groups
  276. (keep
  277. (fn [block-uuids]
  278. (when-let [block-uuids*
  279. (seq (filter
  280. (fn [block-uuid] (not (db/entity repo [:block/uuid (uuid block-uuid)])))
  281. block-uuids))]
  282. ["remove" {:block-uuids block-uuids*}]))))
  283. update-ops* (->> update-block-uuids
  284. (keep (fn [block-uuid]
  285. (when-let [b (db/entity repo [:block/uuid (uuid block-uuid)])]
  286. (let [left-uuid (some-> b :block/left :block/uuid str)
  287. parent-uuid (some-> b :block/parent :block/uuid str)]
  288. ["update" {:block-uuid block-uuid
  289. :target-uuid left-uuid :sibling? (not= left-uuid parent-uuid)
  290. :content (:block/content b)}])))))]
  291. [remove-ops* move-ops* update-ops*]))
  292. (defn- <client-op-update-handler
  293. [state]
  294. {:pre [(some? @(:*graph-uuid state))
  295. (some? @(:*repo state))]}
  296. (go
  297. (let [repo @(:*repo state)
  298. {:keys [ops local-tx]} (<! (p->c (op/<get-ops&local-tx repo)))
  299. ops* (mapv second ops)
  300. op-keys (mapv first ops)
  301. ops-for-remote (apply concat (client-ops->remote-ops state ops*))
  302. r (with-sub-data-from-ws state
  303. (<! (ws/<send! state {:req-id (get-req-id)
  304. :action "apply-ops" :graph-uuid @(:*graph-uuid state)
  305. :ops ops-for-remote :t-before (or local-tx 1)}))
  306. (<! (get-result-ch)))]
  307. (<! (p->c (op/<clean-ops repo op-keys)))
  308. (<! (<apply-remote-data state r))
  309. (prn :<client-op-update-handler r))))
  310. (defn <loop-for-rtc
  311. [state graph-uuid repo]
  312. {:pre [(state-validator state)
  313. (some? graph-uuid)
  314. (some? repo)]}
  315. (go
  316. (reset! (:*graph-uuid state) graph-uuid)
  317. (reset! (:*repo state) repo)
  318. (let [{:keys [data-from-ws-pub client-op-update-chan]} state
  319. push-data-from-ws-ch (chan (async/sliding-buffer 100))
  320. stop-rtc-loop-chan (chan)]
  321. (reset! (:*stop-rtc-loop-chan state) (chan))
  322. (with-sub-data-from-ws state
  323. (<! (ws/<send! state {:action "register-graph-updates" :req-id (get-req-id) :graph-uuid graph-uuid}))
  324. (<! (get-result-ch)))
  325. (async/sub data-from-ws-pub "push-updates" push-data-from-ws-ch)
  326. (<! (go-loop []
  327. (let [{:keys [push-data-from-ws client-op-update stop]}
  328. (async/alt!
  329. client-op-update-chan {:client-op-update true}
  330. push-data-from-ws-ch ([v] {:push-data-from-ws v})
  331. stop-rtc-loop-chan {:stop true}
  332. :priority true)]
  333. (cond
  334. push-data-from-ws
  335. (do (<push-data-from-ws-handler state push-data-from-ws)
  336. (recur))
  337. client-op-update
  338. (do (<! (<client-op-update-handler state))
  339. (recur))
  340. stop (prn :stop-loop-for-rtc graph-uuid)
  341. :else
  342. nil))))
  343. (async/unsub data-from-ws-pub "push-updates" push-data-from-ws-ch))))
  344. (defn init-state
  345. [ws data-from-ws-chan user-uuid]
  346. (m/parse state-schema
  347. {:user-uuid user-uuid
  348. :*graph-uuid (atom nil)
  349. :*repo (atom nil)
  350. :data-from-ws-chan data-from-ws-chan
  351. :data-from-ws-pub (async/pub data-from-ws-chan :req-id)
  352. :*stop-rtc-loop-chan (atom nil)
  353. :client-op-update-chan (chan 1)
  354. :*ws (atom ws)}))
  355. (defn <init
  356. []
  357. (go
  358. (let [data-from-ws-chan (chan (async/sliding-buffer 100))
  359. ws-opened-ch (chan)
  360. user-uuid "f92bb5b3-0f72-4a74-9ad8-1793e655c309"
  361. ws (ws/ws-listen user-uuid data-from-ws-chan ws-opened-ch)]
  362. (<! ws-opened-ch)
  363. (init-state ws data-from-ws-chan user-uuid))))
  364. (defonce debug-state (atom nil))
  365. (def debug-graph-uuid "6478874f-20a7-4335-9379-4cfb1cfa1b25")
  366. (defn ^:export debug-init
  367. []
  368. (go
  369. (let [state (<! (<init))]
  370. (reset! debug-state state)
  371. (<! (<loop-for-rtc state debug-graph-uuid (state/get-current-repo)))
  372. state)))
  373. (defn ^:export debug-stop-rtc-loop
  374. []
  375. (async/close! (:*stop-rtc-loop-chan @debug-state)))
  376. (defn ^:export download-graph
  377. [repo graph-uuid]
  378. (go
  379. (let [state (<! (<init))]
  380. (<! (full-upload-download-graph/<download-graph state repo graph-uuid)))))
  381. (defn ^:export upload-graph
  382. []
  383. (go
  384. (let [state (<! (<init))]
  385. (<! (full-upload-download-graph/<upload-graph state)))))
  386. (defn ^:export debug-client-push-updates
  387. []
  388. (async/put! (:client-op-update-chan @debug-state) true))
  389. (comment
  390. (go
  391. (def global-state (<! (<init))))
  392. (reset! (:*graph-uuid global-state) debug-graph-uuid)
  393. (reset! (:*repo global-state) (state/get-current-repo))
  394. )