op_mem_layer.cljs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391
  1. (ns frontend.worker.rtc.op-mem-layer
  2. "Store client-ops in memory.
  3. And sync these ops to indexedDb automatically."
  4. (:require [clojure.set :as set]
  5. [frontend.common.missionary-util :as c.m]
  6. [frontend.worker.rtc.const :as rtc-const]
  7. [frontend.worker.rtc.op-idb-layer :as op-idb-layer]
  8. [frontend.worker.state :as worker-state]
  9. [logseq.db :as ldb]
  10. [logseq.db.sqlite.util :as sqlite-util]
  11. [malli.core :as ma]
  12. [malli.transform :as mt]
  13. [missionary.core :as m]
  14. [promesa.core :as p]))
  15. (def op-schema
  16. [:multi {:dispatch first}
  17. [:move
  18. [:catn
  19. [:op :keyword]
  20. [:t :int]
  21. [:value [:map
  22. [:block-uuid :uuid]]]]]
  23. [:remove
  24. [:catn
  25. [:op :keyword]
  26. [:t :int]
  27. [:value [:map
  28. [:block-uuid :uuid]]]]]
  29. [:update-page
  30. [:catn
  31. [:op :keyword]
  32. [:t :int]
  33. [:value [:map
  34. [:block-uuid :uuid]]]]]
  35. [:remove-page
  36. [:catn
  37. [:op :keyword]
  38. [:t :int]
  39. [:value [:map
  40. [:block-uuid :uuid]]]]]
  41. [:update
  42. [:catn
  43. [:op :keyword]
  44. [:t :int]
  45. [:value [:map
  46. [:block-uuid :uuid]
  47. [:av-coll [:sequential rtc-const/av-schema]]]]]]
  48. [:update-asset
  49. [:catn
  50. [:op :keyword]
  51. [:t :int]
  52. [:value [:map
  53. [:asset-uuid :uuid]]]]]
  54. [:remove-asset
  55. [:catn
  56. [:op :keyword]
  57. [:t :int]
  58. [:value [:map
  59. [:asset-uuid :uuid]]]]]])
  60. (def ops-schema [:sequential op-schema])
  61. (def ops-coercer (ma/coercer ops-schema mt/json-transformer nil #(ma/-fail! ::ops-schema %)))
  62. (def ops-store-value-schema
  63. [:map
  64. [:graph-uuid {:optional true} :string]
  65. [:local-tx {:optional true} :int]
  66. [:block-uuid->ops [:map-of :uuid
  67. [:map-of [:enum :move :remove :update :update-page :remove-page] :any]]]
  68. [:asset-uuid->ops [:map-of :uuid
  69. [:map-of [:enum :update-asset :remove-asset] :any]]]
  70. [:t+block-uuid-sorted-set [:set [:cat :int :uuid]]]])
  71. (def ops-store-schema
  72. [:map-of :string ; repo-name
  73. [:map
  74. [:current-branch ops-store-value-schema]
  75. [:old-branch {:optional true} [:maybe ops-store-value-schema]]]])
  76. (def ops-store-schema-coercer (ma/coercer ops-store-schema nil nil #(ma/-fail! ::ops-store-schema %)))
  77. (defonce *ops-store (atom {} :validator ops-store-schema-coercer))
  78. (defn- merge-update-ops
  79. [update-op1 update-op2]
  80. {:pre [(= :update (first update-op1))
  81. (= :update (first update-op2))
  82. (= (:block-uuid (last update-op1))
  83. (:block-uuid (last update-op2)))]}
  84. (let [t1 (second update-op1)
  85. t2 (second update-op2)]
  86. (if (> t1 t2)
  87. (merge-update-ops update-op2 update-op1)
  88. (let [{av-coll1 :av-coll block-uuid :block-uuid} (last update-op1)
  89. av-coll2 (:av-coll (last update-op2))]
  90. [:update t2
  91. {:block-uuid block-uuid
  92. :av-coll (concat av-coll1 av-coll2)}]))))
  93. (defn- block-uuid->min-t
  94. [block-uuid->ops block-uuid]
  95. (some->> (block-uuid->ops block-uuid)
  96. vals
  97. (map second)
  98. seq
  99. (apply min)))
  100. (defn- update-t+block-uuid-sorted-set
  101. [t+block-uuid-sorted-set old-block-uuid->ops block-uuid->ops block-uuid]
  102. (let [origin-min-t (block-uuid->min-t old-block-uuid->ops block-uuid)
  103. min-t (block-uuid->min-t block-uuid->ops block-uuid)]
  104. (cond-> t+block-uuid-sorted-set
  105. origin-min-t (disj [origin-min-t block-uuid])
  106. true (conj [min-t block-uuid]))))
  107. (defn ^:large-vars/cleanup-todo add-ops-aux
  108. [ops block-uuid->ops t+block-uuid-sorted-set]
  109. (loop [block-uuid->ops block-uuid->ops
  110. t+block-uuid-sorted-set t+block-uuid-sorted-set
  111. [op & others] ops]
  112. (if-not op
  113. {:block-uuid->ops block-uuid->ops
  114. :t+block-uuid-sorted-set t+block-uuid-sorted-set}
  115. (let [[op-type t value] op
  116. {:keys [block-uuid]} value
  117. exist-ops (some-> block-uuid block-uuid->ops)]
  118. (case op-type
  119. :move
  120. (let [already-removed? (some-> (get exist-ops :remove) second (> t))]
  121. (if already-removed?
  122. (recur block-uuid->ops t+block-uuid-sorted-set others)
  123. (let [block-uuid->ops* (-> block-uuid->ops
  124. (assoc-in [block-uuid :move] op)
  125. (update block-uuid dissoc :remove))
  126. t+block-uuid-sorted-set*
  127. (update-t+block-uuid-sorted-set t+block-uuid-sorted-set
  128. block-uuid->ops
  129. block-uuid->ops*
  130. block-uuid)]
  131. (recur block-uuid->ops* t+block-uuid-sorted-set* others))))
  132. :update
  133. (let [already-removed? (some-> (get exist-ops :remove) second (> t))]
  134. (if already-removed?
  135. (recur block-uuid->ops t+block-uuid-sorted-set others)
  136. (let [origin-update-op (get-in block-uuid->ops [block-uuid :update])
  137. op* (if origin-update-op (merge-update-ops origin-update-op op) op)
  138. block-uuid->ops* (-> block-uuid->ops
  139. (assoc-in [block-uuid :update] op*)
  140. (update block-uuid dissoc :remove))
  141. t+block-uuid-sorted-set*
  142. (update-t+block-uuid-sorted-set t+block-uuid-sorted-set
  143. block-uuid->ops
  144. block-uuid->ops*
  145. block-uuid)]
  146. (recur block-uuid->ops* t+block-uuid-sorted-set* others))))
  147. :remove
  148. (let [add-after-remove? (some-> (get exist-ops :move) second (> t))]
  149. (if add-after-remove?
  150. (recur block-uuid->ops t+block-uuid-sorted-set others)
  151. (let [block-uuid->ops* (assoc block-uuid->ops block-uuid {:remove op})
  152. t+block-uuid-sorted-set*
  153. (update-t+block-uuid-sorted-set t+block-uuid-sorted-set
  154. block-uuid->ops
  155. block-uuid->ops*
  156. block-uuid)]
  157. (recur block-uuid->ops* t+block-uuid-sorted-set* others))))
  158. :update-page
  159. (let [already-removed? (some-> (get exist-ops :remove-page) second (> t))]
  160. (if already-removed?
  161. (recur block-uuid->ops t+block-uuid-sorted-set others)
  162. (let [block-uuid->ops* (-> block-uuid->ops
  163. (assoc-in [block-uuid :update-page] op)
  164. (update block-uuid dissoc :remove-page))
  165. t+block-uuid-sorted-set*
  166. (update-t+block-uuid-sorted-set t+block-uuid-sorted-set
  167. block-uuid->ops
  168. block-uuid->ops*
  169. block-uuid)]
  170. (recur block-uuid->ops* t+block-uuid-sorted-set* others))))
  171. :remove-page
  172. (let [add-after-remove? (some-> (get exist-ops :update-page) second (> t))]
  173. (if add-after-remove?
  174. (recur block-uuid->ops t+block-uuid-sorted-set others)
  175. (let [block-uuid->ops* (assoc block-uuid->ops block-uuid {:remove-page op})
  176. t+block-uuid-sorted-set*
  177. (update-t+block-uuid-sorted-set t+block-uuid-sorted-set
  178. block-uuid->ops
  179. block-uuid->ops*
  180. block-uuid)]
  181. (recur block-uuid->ops* t+block-uuid-sorted-set* others)))))))))
  182. (def ^:private sorted-set-by-t (sorted-set-by (fn [[t1 x] [t2 y]]
  183. (let [r (compare t1 t2)]
  184. (if (not= r 0)
  185. r
  186. (compare x y))))))
  187. (def ^:private empty-ops-store-value {:current-branch {:block-uuid->ops {}
  188. :asset-uuid->ops {}
  189. :t+block-uuid-sorted-set sorted-set-by-t}})
  190. (defn init-empty-ops-store!
  191. [repo]
  192. (swap! *ops-store assoc repo empty-ops-store-value))
  193. (defn remove-ops-store!
  194. [repo]
  195. (swap! *ops-store dissoc repo))
  196. (defn add-ops!
  197. [repo ops]
  198. (assert (contains? (@*ops-store repo) :current-branch) (@*ops-store repo))
  199. (let [ops (ops-coercer ops)
  200. {{old-branch-block-uuid->ops :block-uuid->ops
  201. old-t+block-uuid-sorted-set :t+block-uuid-sorted-set
  202. :as old-branch} :old-branch
  203. {:keys [block-uuid->ops t+block-uuid-sorted-set]} :current-branch}
  204. (get @*ops-store repo)
  205. {:keys [block-uuid->ops t+block-uuid-sorted-set]}
  206. (add-ops-aux ops block-uuid->ops t+block-uuid-sorted-set)
  207. {old-branch-block-uuid->ops :block-uuid->ops old-t+block-uuid-sorted-set :t+block-uuid-sorted-set}
  208. (when old-branch
  209. (add-ops-aux ops old-branch-block-uuid->ops old-t+block-uuid-sorted-set))]
  210. (swap! *ops-store update repo
  211. (fn [{:keys [current-branch old-branch]}]
  212. (cond-> {:current-branch
  213. (assoc current-branch
  214. :block-uuid->ops block-uuid->ops
  215. :t+block-uuid-sorted-set t+block-uuid-sorted-set)}
  216. old-branch
  217. (assoc :old-branch
  218. (assoc old-branch
  219. :block-uuid->ops old-branch-block-uuid->ops
  220. :t+block-uuid-sorted-set old-t+block-uuid-sorted-set)))))))
  221. (defn update-local-tx!
  222. [repo t]
  223. (assert (contains? (@*ops-store repo) :current-branch))
  224. (swap! *ops-store update-in [repo :current-branch] assoc :local-tx t))
  225. (defn update-graph-uuid!
  226. [repo graph-uuid]
  227. (assert (contains? (@*ops-store repo) :current-branch))
  228. (swap! *ops-store update repo
  229. (fn [{:keys [current-branch old-branch]}]
  230. (cond-> {:current-branch (assoc current-branch :graph-uuid graph-uuid)}
  231. old-branch (assoc :old-branch (assoc old-branch :graph-uuid graph-uuid))))))
  232. (defn new-branch!
  233. "Make a copy of current repo-ops-store, and also store in `*ops-store`.
  234. The following `add-ops` apply on both old-branch and new-branch(current).
  235. use `rollback` to replace current-branch with old-branch.
  236. use `commit` to remove old-branch."
  237. [repo]
  238. (let [{:keys [current-branch]} (get @*ops-store repo)]
  239. (assert (some? current-branch) repo)
  240. (swap! *ops-store assoc-in [repo :old-branch] current-branch)))
  241. (defn rollback!
  242. [repo]
  243. (when-let [old-branch (get-in @*ops-store [repo :old-branch])]
  244. (assert (some? old-branch))
  245. (swap! *ops-store assoc repo {:current-branch old-branch})))
  246. (defn commit!
  247. [repo]
  248. (swap! *ops-store update repo dissoc :old-branch))
  249. (defn get-min-t-block-ops
  250. [repo]
  251. (let [repo-ops-store (get @*ops-store repo)
  252. {:keys [t+block-uuid-sorted-set block-uuid->ops]} (:current-branch repo-ops-store)]
  253. (assert (contains? repo-ops-store :current-branch) repo)
  254. (when-let [[t block-uuid] (first t+block-uuid-sorted-set)]
  255. (if (contains? block-uuid->ops block-uuid)
  256. {:block-uuid block-uuid
  257. :ops (block-uuid->ops block-uuid)}
  258. (throw (ex-info "unavailable" {:t t :block-uuid block-uuid :block-uuid->ops block-uuid->ops}))
  259. ;; if not found, remove item in :t+block-uuid-sorted-set and retry
  260. ;; (do (swap! *ops-store update-in [repo :current-branch] assoc
  261. ;; :t+block-uuid-sorted-set (disj t+block-uuid-sorted-set [t block-uuid]))
  262. ;; (get-min-t-block-ops repo))
  263. ))))
  264. (defn get-block-ops
  265. [repo block-uuid]
  266. (let [repo-ops-store (get @*ops-store repo)
  267. {:keys [block-uuid->ops]} (:current-branch repo-ops-store)]
  268. (assert (contains? repo-ops-store :current-branch) repo)
  269. (block-uuid->ops block-uuid)))
  270. (defn get-all-ops
  271. [repo]
  272. (some->> (get @*ops-store repo)
  273. :current-branch
  274. :block-uuid->ops
  275. vals
  276. (mapcat vals)))
  277. (defn get-local-tx
  278. [repo]
  279. (some-> (get @*ops-store repo)
  280. :current-branch
  281. :local-tx))
  282. (defn get-unpushed-block-update-count
  283. [repo]
  284. (or
  285. (some-> (get @*ops-store repo)
  286. :current-branch
  287. :block-uuid->ops
  288. keys
  289. count)
  290. 0))
  291. (comment
  292. (defn get-unpushed-asset-update-count
  293. [repo]
  294. (some-> (get @*ops-store repo)
  295. :current-branch
  296. :asset-uuid->ops
  297. keys
  298. count)))
  299. (defn intersection-block-uuids
  300. [repo block-uuid-coll]
  301. (some->> (get @*ops-store repo)
  302. :current-branch
  303. :block-uuid->ops
  304. keys
  305. set
  306. (set/intersection (set block-uuid-coll))))
  307. (defn remove-block-ops!
  308. [repo block-uuid]
  309. {:pre [(uuid? block-uuid)]}
  310. (let [repo-ops-store (get @*ops-store repo)
  311. {:keys [t+block-uuid-sorted-set block-uuid->ops]} (:current-branch repo-ops-store)]
  312. (assert (contains? repo-ops-store :current-branch) repo)
  313. (let [min-t (block-uuid->min-t block-uuid->ops block-uuid)]
  314. (swap! *ops-store update-in [repo :current-branch] assoc
  315. :block-uuid->ops (dissoc block-uuid->ops block-uuid)
  316. :t+block-uuid-sorted-set (disj t+block-uuid-sorted-set [min-t block-uuid])))))
  317. (defn <init-load-from-indexeddb2!
  318. [repo]
  319. (p/let [v (op-idb-layer/<read2 repo)]
  320. (when v
  321. (let [v (assoc v
  322. :t+block-uuid-sorted-set
  323. (apply conj sorted-set-by-t (:t+block-uuid-sorted-set v)))]
  324. (swap! *ops-store assoc repo {:current-branch v})
  325. (prn ::<init-load-from-indexeddb! repo)))))
  326. (defn new-task--sync-to-idb
  327. [repo]
  328. (m/sp
  329. (when-let [v (:current-branch (@*ops-store repo))]
  330. (m/? (c.m/await-promise (op-idb-layer/<reset2! repo v))))))
  331. (defn- new-task--sync-to-idb-loop
  332. []
  333. (m/sp
  334. (let [*v-hash (atom nil)]
  335. (loop []
  336. (m/? (m/sleep 3000))
  337. (let [repo (worker-state/get-current-repo)
  338. conn (worker-state/get-datascript-conn repo)]
  339. (when (and repo conn
  340. (ldb/db-based-graph? @conn))
  341. (when-let [v (:current-branch (@*ops-store repo))]
  342. (let [v-hash (hash v)]
  343. (when (not= v-hash @*v-hash)
  344. (m/? (c.m/await-promise (op-idb-layer/<reset2! repo v)))
  345. (reset! *v-hash v-hash))))))
  346. (recur)))))
  347. #_:clj-kondo/ignore
  348. (defonce _sync-loop-canceler (c.m/run-task (new-task--sync-to-idb-loop) ::sync-to-idb-loop))
  349. (defn rtc-db-graph?
  350. "Is db-graph & RTC enabled"
  351. [repo]
  352. (and (sqlite-util/db-based-graph? repo)
  353. (or (exists? js/process)
  354. (some? (get-local-tx repo)))))