sync.cljs 133 KB


  1. (ns frontend.fs.sync
  2. "Main ns for providing file sync functionality"
  3. (:require [cljs-http.client :as http]
  4. [cljs-time.core :as t]
  5. [cljs-time.format :as tf]
  6. [cljs-time.coerce :as tc]
  7. [cljs.core.async :as async :refer [go timeout go-loop offer! poll! chan <! >!]]
  8. [cljs.core.async.impl.channels]
  9. [cljs.core.async.interop :refer [p->c]]
  10. [cljs.spec.alpha :as s]
  11. [clojure.set :as set]
  12. [clojure.string :as string]
  13. [clojure.pprint :as pp]
  14. [electron.ipc :as ipc]
  15. [goog.string :as gstring]
  16. [frontend.config :as config]
  17. [frontend.debug :as debug]
  18. [frontend.handler.user :as user]
  19. [frontend.state :as state]
  20. [frontend.mobile.util :as mobile-util]
  21. [frontend.util :as util]
  22. [frontend.util.persist-var :as persist-var]
  23. [frontend.util.fs :as fs-util]
  24. [frontend.handler.notification :as notification]
  25. [frontend.context.i18n :refer [t]]
  26. [frontend.diff :as diff]
  27. [frontend.db :as db]
  28. [frontend.fs :as fs]
  29. [frontend.encrypt :as encrypt]
  30. [logseq.graph-parser.util :as gp-util]
  31. [medley.core :refer [dedupe-by]]
  32. [rum.core :as rum]
  33. [promesa.core :as p]
  34. [lambdaisland.glogi :as log]
  35. [frontend.fs.capacitor-fs :as capacitor-fs]
  36. ["@capawesome/capacitor-background-task" :refer [BackgroundTask]]))
  37. ;;; ### Commentary
  38. ;; file-sync related local files/dirs:
  39. ;; - logseq/graphs-txid.edn
  40. ;; this file contains [user-uuid graph-uuid transaction-id]
  41. ;; graph-uuid: the unique identifier of the graph on the server
  42. ;; transaction-id: sync progress of local files
  43. ;; - logseq/version-files
  44. ;; downloaded version-files
  45. ;; files included by `get-ignored-files` will not be synchronized.
  46. ;;
  47. ;; sync strategy:
  48. ;; - when toggle file-sync on,
  49. ;; trigger remote->local-full-sync first, then local->remote-full-sync
  50. ;; local->remote-full-sync will compare local-files with remote-files (by md5),
  51. ;; and upload new-added-files to remote server.
  52. ;; - if local->remote sync(normal-sync or full-sync) return :need-sync-remote,
  53. ;; then trigger a remote->local sync
  54. ;; - if remote->local sync return :need-remote->local-full-sync,
  55. ;; then we need a remote->local-full-sync,
  56. ;; which compare local-files with remote-files, sync diff-remote-files to local
  57. ;; - local->remote-full-sync will be triggered after 20mins of idle
  58. ;; - every 10s, flush local changes, and sync to remote
  59. ;; TODO: use access-token instead of id-token
  60. ;; TODO: a remote delete-diff cause local related-file deleted, then trigger a `FileChangeEvent`,
  61. ;; and re-produce a new same-file-delete diff.
  62. ;;; ### specs
  63. (s/def ::state #{;; do following jobs when ::starting:
  64. ;; - wait seconds for file-change-events from file-watcher
  65. ;; - drop redundant file-change-events
  66. ;; - setup states in `frontend.state`
  67. ::starting
  68. ::need-password
  69. ::idle
  70. ;; sync local-changed files
  71. ::local->remote
  72. ;; sync remote latest-transactions
  73. ::remote->local
  74. ;; local->remote full sync
  75. ::local->remote-full-sync
  76. ;; remote->local full sync
  77. ::remote->local-full-sync
  78. ;; snapshot state when switching between apps on iOS
  79. ::pause
  80. ::stop})
  81. (s/def ::path string?)
  82. (s/def ::time t/date?)
  83. (s/def ::remote->local-type #{:delete :update
  84. ;; :rename=:delete+:update
  85. })
  86. (s/def ::current-syncing-graph-uuid (s/or :nil nil? :graph-uuid string?))
  87. (s/def ::recent-remote->local-file-item (s/keys :req-un [::remote->local-type ::checksum ::path]))
  88. (s/def ::current-local->remote-files (s/coll-of ::path :kind set?))
  89. (s/def ::current-remote->local-files (s/coll-of ::path :kind set?))
  90. (s/def ::recent-remote->local-files (s/coll-of ::recent-remote->local-file-item :kind set?))
  91. (s/def ::history-item (s/keys :req-un [::path ::time]))
  92. (s/def ::history (s/coll-of ::history-item :kind seq?))
  93. (s/def ::sync-state (s/keys :req-un [::current-syncing-graph-uuid
  94. ::state
  95. ::current-local->remote-files
  96. ::current-remote->local-files
  97. ::queued-local->remote-files
  98. ;; Downloading files from remote will trigger filewatcher events,
  99. ;; causes unreasonable information in the content of ::queued-local->remote-files,
  100. ;; use ::recent-remote->local-files to filter such events
  101. ::recent-remote->local-files
  102. ::history]))
  103. ;; diff
  104. (s/def ::TXId pos-int?)
  105. (s/def ::TXType #{"update_files" "delete_files" "rename_file"})
  106. (s/def ::TXContent-to-path string?)
  107. (s/def ::TXContent-from-path (s/or :some string? :none nil?))
  108. (s/def ::TXContent-checksum (s/or :some string? :none nil?))
  109. (s/def ::TXContent-item (s/tuple ::TXContent-to-path
  110. ::TXContent-from-path
  111. ::TXContent-checksum))
  112. (s/def ::TXContent (s/coll-of ::TXContent-item))
  113. (s/def ::diff (s/keys :req-un [::TXId ::TXType ::TXContent]))
  114. (s/def ::succ-map #(= {:succ true} %))
  115. (s/def ::unknown-map (comp some? :unknown))
  116. (s/def ::stop-map #(= {:stop true} %))
  117. (s/def ::pause-map #(= {:pause true} %))
  118. (s/def ::need-sync-remote #(= {:need-sync-remote true} %))
  119. (s/def ::graph-has-been-deleted #(= {:graph-has-been-deleted true} %))
  120. (s/def ::sync-local->remote!-result
  121. (s/or :stop ::stop-map
  122. :succ ::succ-map
  123. :pause ::pause-map
  124. :need-sync-remote ::need-sync-remote
  125. :graph-has-been-deleted ::graph-has-been-deleted
  126. :unknown ::unknown-map))
  127. (s/def ::sync-remote->local!-result
  128. (s/or :succ ::succ-map
  129. :need-remote->local-full-sync
  130. #(= {:need-remote->local-full-sync true} %)
  131. :stop ::stop-map
  132. :pause ::pause-map
  133. :unknown ::unknown-map))
  134. (s/def ::sync-local->remote-all-files!-result
  135. (s/or :succ ::succ-map
  136. :stop ::stop-map
  137. :need-sync-remote ::need-sync-remote
  138. :graph-has-been-deleted ::graph-has-been-deleted
  139. :unknown ::unknown-map))
  140. ;; sync-event type
  141. (s/def ::event #{:created-local-version-file
  142. :finished-local->remote
  143. :finished-remote->local
  144. :start
  145. :pause
  146. :resume
  147. :exception-decrypt-failed
  148. :remote->local-full-sync-failed
  149. :local->remote-full-sync-failed
  150. :get-remote-graph-failed
  151. :get-deletion-logs-failed
  152. })
  153. (s/def ::sync-event (s/keys :req-un [::event ::data]))
  154. (defonce download-batch-size 100)
  155. (defonce upload-batch-size 20)
  156. (def ^:private current-sm-graph-uuid (atom nil))
  157. ;;; ### configs in config.edn
  158. ;; - :file-sync/ignore-files
  159. (defn- get-ignored-files
  160. []
  161. (into #{#"logseq/graphs-txid.edn$"
  162. #"logseq/pages-metadata.edn$"
  163. #"logseq/version-files/"
  164. #"logseq/bak/"
  165. #"node_modules/"
  166. ;; path starts with `.` in the root directory, e.g. .gitignore
  167. #"^\.[^.]+"
  168. ;; path includes `/.`, e.g. .git, .DS_store
  169. #"/\."
  170. ;; Emacs/Vim backup files end with `~` by default
  171. #"~$"}
  172. (map re-pattern)
  173. (:file-sync/ignore-files (state/get-config))))
  174. ;;; ### configs ends
  175. (def ws-addr config/WS-URL)
  176. ;; Warning: make sure to `persist-var/-load` graphs-txid before using it.
  177. (defonce graphs-txid (persist-var/persist-var nil "graphs-txid"))
  178. (declare assert-local-txid<=remote-txid)
  179. (defn <update-graphs-txid!
  180. [latest-txid graph-uuid user-uuid repo]
  181. {:pre [(int? latest-txid) (>= latest-txid 0)]}
  182. (-> (p/let [_ (persist-var/-reset-value! graphs-txid [user-uuid graph-uuid latest-txid] repo)
  183. _ (persist-var/persist-save graphs-txid)]
  184. (when (state/developer-mode?) (assert-local-txid<=remote-txid)))
  185. p->c))
  186. (defn clear-graphs-txid! [repo]
  187. (persist-var/-reset-value! graphs-txid nil repo)
  188. (persist-var/persist-save graphs-txid))
  189. (defn- ws-ping-loop [ws]
  190. (go-loop []
  191. (let [state (.-readyState ws)]
  192. ;; not closing or closed state
  193. (when (not (contains? #{2 3} state))
  194. (if (not= 1 state)
  195. ;; when connecting, wait 1s
  196. (do (<! (timeout 1000))
  197. (recur))
  198. (do (.send ws "PING")
  199. (<! (timeout 30000))
  200. (recur)))))))
  201. (defn- ws-stop! [*ws]
  202. (when *ws
  203. (swap! *ws (fn [o] (assoc o :stop true)))
  204. (when-let [ws (:ws @*ws)]
  205. (.close ws))))
  206. (defn- ws-listen!*
  207. [graph-uuid *ws remote-changes-chan]
  208. (reset! *ws {:ws (js/WebSocket. (util/format ws-addr graph-uuid)) :stop false})
  209. (ws-ping-loop (:ws @*ws))
  210. ;; (set! (.-onopen (:ws @*ws)) #(println (util/format "ws opened: graph '%s'" graph-uuid %)))
  211. (set! (.-onclose (:ws @*ws)) (fn [_e]
  212. (when-not (true? (:stop @*ws))
  213. (go
  214. (timeout 1000)
  215. (println "re-connecting graph" graph-uuid)
  216. (ws-listen!* graph-uuid *ws remote-changes-chan)))))
  217. (set! (.-onmessage (:ws @*ws)) (fn [e]
  218. (let [data (js->clj (js/JSON.parse (.-data e)) :keywordize-keys true)]
  219. (when (some? (:txid data))
  220. (if-let [v (poll! remote-changes-chan)]
  221. (let [last-txid (:txid v)
  222. current-txid (:txid data)]
  223. (if (> last-txid current-txid)
  224. (offer! remote-changes-chan v)
  225. (offer! remote-changes-chan data)))
  226. (offer! remote-changes-chan data)))))))
  227. (defn ws-listen!
  228. "return channel which output messages from server"
  229. [graph-uuid *ws]
  230. (let [remote-changes-chan (chan (async/sliding-buffer 1))]
  231. (ws-listen!* graph-uuid *ws remote-changes-chan)
  232. remote-changes-chan))
  233. (defn- get-json-body [body]
  234. (or (and (not (string? body)) body)
  235. (or (string/blank? body) nil)
  236. (js->clj (js/JSON.parse body) :keywordize-keys true)))
  237. (defn- get-resp-json-body [resp]
  238. (-> resp (:body) (get-json-body)))
  239. (defn- <request-once [api-name body token]
  240. (go
  241. (let [resp (http/post (str "https://" config/API-DOMAIN "/file-sync/" api-name)
  242. {:oauth-token token
  243. :body (js/JSON.stringify (clj->js body))
  244. :with-credentials? false})]
  245. {:resp (<! resp)
  246. :api-name api-name
  247. :body body})))
  248. ;; For debug
  249. (def *on-flying-request
  250. "requests not finished"
  251. (atom #{}))
  252. (def stoppable-apis #{"get_all_files"})
  253. (defn- <request*
  254. "max retry count is 5.
  255. *stop: volatile var, stop retry-request when it's true,
  256. and return :stop"
  257. ([api-name body token *stop] (<request* api-name body token 0 *stop))
  258. ([api-name body token retry-count *stop]
  259. (go
  260. (if (and *stop @*stop (contains? stoppable-apis api-name))
  261. :stop
  262. (let [resp (<! (<request-once api-name body token))]
  263. (if (and
  264. (= 401 (get-in resp [:resp :status]))
  265. (= "Unauthorized" (:message (get-json-body (get-in resp [:resp :body])))))
  266. (if (> retry-count 5)
  267. (throw (js/Error. :file-sync-request))
  268. (do (println "will retry after" (min 60000 (* 1000 retry-count)) "ms")
  269. (<! (timeout (min 60000 (* 1000 retry-count))))
  270. (<! (<request* api-name body token (inc retry-count) *stop))))
  271. (:resp resp)))))))
  272. (defn <request [api-name & args]
  273. (let [name (str api-name (.now js/Date))]
  274. (go (swap! *on-flying-request conj name)
  275. (let [r (<! (apply <request* api-name args))]
  276. (swap! *on-flying-request disj name)
  277. r))))
  278. (defn- remove-dir-prefix [dir path]
  279. (let [r (string/replace path (js/RegExp. (str "^" (gstring/regExpEscape dir))) "")]
  280. (if (string/starts-with? r "/")
  281. (string/replace-first r "/" "")
  282. r)))
  283. (defn- remove-user-graph-uuid-prefix
  284. "<user-uuid>/<graph-uuid>/path -> path"
  285. [path]
  286. (let [parts (string/split path "/")]
  287. (if (and (< 2 (count parts))
  288. (= 36 (count (parts 0)))
  289. (= 36 (count (parts 1))))
  290. (string/join "/" (drop 2 parts))
  291. path)))
  292. (defprotocol IRelativePath
  293. (-relative-path [this]))
  294. (defn relative-path [o]
  295. (let [repo-dir (config/get-repo-dir (state/get-current-repo))]
  296. (cond
  297. (implements? IRelativePath o)
  298. (-relative-path o)
  299. ;; full path
  300. (and (string? o) (string/starts-with? o repo-dir))
  301. (string/replace o (str repo-dir "/") "")
  302. (string? o)
  303. (remove-user-graph-uuid-prefix o)
  304. :else
  305. (throw (js/Error. (str "unsupport type " (str o)))))))
  306. (defprotocol IChecksum
  307. (-checksum [this]))
  308. (defprotocol IStoppable
  309. (-stop! [this]))
  310. (defprotocol IStopped?
  311. (-stopped? [this]))
  312. ;from-path, to-path is relative path
  313. (deftype FileTxn [from-path to-path updated? deleted? txid checksum]
  314. Object
  315. (renamed? [_]
  316. (not= from-path to-path))
  317. IRelativePath
  318. (-relative-path [_] (remove-user-graph-uuid-prefix to-path))
  319. IEquiv
  320. (-equiv [_ ^FileTxn other]
  321. (and (= from-path (.-from-path other))
  322. (= to-path (.-to-path other))
  323. (= updated? (.-updated? other))
  324. (= deleted? (.-deleted? other))))
  325. IHash
  326. (-hash [_] (hash [from-path to-path updated? deleted?]))
  327. IComparable
  328. (-compare [_ ^FileTxn other]
  329. (compare txid (.-txid other)))
  330. IPrintWithWriter
  331. (-pr-writer [coll w _opts]
  332. (write-all w "#FileTxn[\"" from-path "\" -> \"" to-path
  333. "\" (updated? " updated? ", renamed? " (.renamed? coll) ", deleted? " deleted?
  334. ", txid " txid ", checksum " checksum ")]")))
  335. (defn- assert-filetxns
  336. [filetxns]
  337. (every? true?
  338. (mapv
  339. (fn [^FileTxn filetxn]
  340. (if (.-updated? filetxn)
  341. (some? (-checksum filetxn))
  342. true))
  343. filetxns)))
  344. (defn- diff->filetxns
  345. "convert diff(`<get-diff`) to `FileTxn`"
  346. [{:keys [TXId TXType TXContent]}]
  347. {:post [(assert-filetxns %)]}
  348. (let [update? (= "update_files" TXType)
  349. delete? (= "delete_files" TXType)
  350. update-xf
  351. (comp
  352. (remove #(or (empty? (first %))
  353. (empty? (last %))))
  354. (map #(->FileTxn (first %) (first %) update? delete? TXId (last %))))
  355. delete-xf
  356. (comp
  357. (remove #(empty? (first %)))
  358. (map #(->FileTxn (first %) (first %) update? delete? TXId nil)))
  359. rename-xf
  360. (comp
  361. (remove #(or (empty? (first %))
  362. (empty? (second %))))
  363. (map #(->FileTxn (second %) (first %) false false TXId nil)))
  364. xf (case TXType
  365. "delete_files" delete-xf
  366. "update_files" update-xf
  367. "rename_file" rename-xf)]
  368. (sequence xf TXContent)))
  369. (defn- distinct-update-filetxns-xf
  370. "transducer.
  371. remove duplicate update&delete `FileTxn`s."
  372. [rf]
  373. (let [seen-update&delete-filetxns (volatile! #{})]
  374. (fn
  375. ([] (rf))
  376. ([result] (rf result))
  377. ([result ^FileTxn filetxn]
  378. (if (and
  379. (or (.-updated? filetxn) (.-deleted? filetxn))
  380. (contains? @seen-update&delete-filetxns filetxn))
  381. result
  382. (do (vswap! seen-update&delete-filetxns conj filetxn)
  383. (rf result filetxn)))))))
  384. (defn- remove-deleted-filetxns-xf
  385. "transducer.
  386. remove update&rename filetxns if they are deleted later(in greater txid filetxn)."
  387. [rf]
  388. (let [seen-deleted-paths (volatile! #{})]
  389. (fn
  390. ([] (rf))
  391. ([result] (rf result))
  392. ([result ^FileTxn filetxn]
  393. (let [to-path (.-to-path filetxn)
  394. from-path (.-from-path filetxn)]
  395. (if (contains? @seen-deleted-paths to-path)
  396. (do (when (not= to-path from-path)
  397. (vswap! seen-deleted-paths disj to-path)
  398. (vswap! seen-deleted-paths conj from-path))
  399. result)
  400. (do (vswap! seen-deleted-paths conj to-path)
  401. (rf result filetxn))))))))
  402. (defn- partition-filetxns
  403. "return transducer.
  404. partition filetxns, at most N update-filetxns in each partition,
  405. for delete and rename type, only one filetxn in each partition."
  406. [n]
  407. (comp
  408. (partition-by #(.-updated? ^FileTxn %))
  409. (map (fn [ts]
  410. (if (some-> (first ts) (.-updated?))
  411. (partition-all n ts)
  412. (map list ts))))
  413. cat))
  414. (defn- contains-path? [regexps path]
  415. (reduce #(when (re-find %2 path) (reduced true)) false regexps))
  416. (defn ignored?
  417. "Whether file is ignored when syncing."
  418. [path]
  419. (->
  420. (get-ignored-files)
  421. (contains-path? (relative-path path))
  422. (boolean)))
  423. (defn- filter-download-files-with-reserved-chars
  424. "Skip downloading file paths with reserved chars."
  425. [files]
  426. (let [f #(and
  427. (not (.-deleted? ^js %))
  428. (fs-util/include-reserved-chars? (-relative-path %)))
  429. reserved-files (filter f files)]
  430. (when (seq reserved-files)
  431. (state/pub-event! [:ui/notify-skipped-downloading-files
  432. (map -relative-path reserved-files)])
  433. (prn "Skipped downloading those file paths with reserved chars: "
  434. (map -relative-path reserved-files)))
  435. (remove f files)))
  436. (defn- filter-upload-files-with-reserved-chars
  437. "Remove upoading file paths with reserved chars."
  438. [paths]
  439. (let [path-string? (string? (first paths))
  440. f (if path-string?
  441. fs-util/include-reserved-chars?
  442. #(fs-util/include-reserved-chars? (-relative-path %)))
  443. reserved-paths (filter f paths)]
  444. (when (seq reserved-paths)
  445. (let [paths (if path-string? reserved-paths (map -relative-path reserved-paths))]
  446. (state/pub-event! [:ui/notify-files-with-reserved-chars paths])
  447. (prn "Skipped uploading those file paths with reserved chars: " paths)))
  448. (vec (remove f paths))))
  449. (defn- diffs->filetxns
  450. "transducer.
  451. 1. diff -> `FileTxn` , see also `<get-diff`
  452. 2. distinct redundant update type filetxns
  453. 3. remove update or rename filetxns if they are deleted in later filetxns.
  454. NOTE: this xf should apply on reversed diffs sequence (sort by txid)"
  455. []
  456. (comp
  457. (map diff->filetxns)
  458. cat
  459. (remove ignored?)
  460. distinct-update-filetxns-xf
  461. remove-deleted-filetxns-xf))
  462. (defn- diffs->partitioned-filetxns
  463. "partition filetxns, each partition contains same type filetxns,
  464. for update type, at most N items in each partition
  465. for delete & rename type, only 1 item in each partition."
  466. [n]
  467. (comp
  468. (diffs->filetxns)
  469. (partition-filetxns n)))
  470. (defn- filepath+checksum->diff
  471. [index {:keys [relative-path checksum user-uuid graph-uuid]}]
  472. {:post [(s/valid? ::diff %)]}
  473. {:TXId (inc index)
  474. :TXType "update_files"
  475. :TXContent [[(string/join "/" [user-uuid graph-uuid relative-path]) nil checksum]]})
  476. (defn filepath+checksum-coll->partitioned-filetxns
  477. "transducer.
  478. 1. filepath+checksum-coll -> diff
  479. 2. diffs->partitioned-filetxns
  480. 3. filter by config"
  481. [n graph-uuid user-uuid]
  482. (comp
  483. (map (fn [p]
  484. {:relative-path (first p) :user-uuid user-uuid :graph-uuid graph-uuid :checksum (second p)}))
  485. (map-indexed filepath+checksum->diff)
  486. (diffs->partitioned-filetxns n)))
  487. (deftype FileMetadata [size etag path encrypted-path last-modified remote? ^:mutable normalized-path]
  488. Object
  489. (get-normalized-path [_]
  490. (assert (string? path) path)
  491. (when-not normalized-path
  492. (set! normalized-path
  493. (cond-> path
  494. (string/starts-with? path "/") (string/replace-first "/" "")
  495. remote? (remove-user-graph-uuid-prefix))))
  496. normalized-path)
  497. IRelativePath
  498. (-relative-path [_] path)
  499. IEquiv
  500. (-equiv [o ^FileMetadata other]
  501. (and (= (.get-normalized-path o) (.get-normalized-path other))
  502. (= etag (.-etag other))))
  503. IHash
  504. (-hash [_] (hash {:etag etag :path path}))
  505. ILookup
  506. (-lookup [o k] (-lookup o k nil))
  507. (-lookup [_ k not-found]
  508. (case k
  509. :size size
  510. :etag etag
  511. :path path
  512. :encrypted-path encrypted-path
  513. :last-modified last-modified
  514. :remote? remote?
  515. not-found))
  516. IPrintWithWriter
  517. (-pr-writer [_ w _opts]
  518. (write-all w (str {:size size :etag etag :path path :remote? remote? :last-modified last-modified}))))
  519. (def ^:private higher-priority-remote-files
  520. "when diff all remote files and local files, following remote files always need to download(when checksum not matched),
  521. even local-file's last-modified > remote-file's last-modified.
  522. because these files will be auto created when the graph created, we dont want them to re-write related remote files."
  523. #{"logseq/config.edn" "logseq/custom.css"
  524. "pages/contents.md" "pages/contents.org"
  525. "logseq/metadata.edn"})
  526. ;; TODO: use fn some to filter FileMetadata here, it cause too much loop
  527. (defn diff-file-metadata-sets
  528. "Find the `FileMetadata`s that exists in s1 and does not exist in s2,
  529. compare by path+checksum+last-modified,
  530. if s1.path = s2.path & s1.checksum <> s2.checksum & s1.last-modified > s2.last-modified
  531. (except some default created files),
  532. keep this `FileMetadata` in result"
  533. [s1 s2]
  534. (reduce
  535. (fn [result item]
  536. (let [path (:path item)
  537. encrypted-path (:encrypted-path item)
  538. checksum (:etag item)
  539. last-modified (:last-modified item)]
  540. (if (some
  541. #(cond
  542. (not= encrypted-path (:encrypted-path %))
  543. false
  544. (= checksum (:etag %))
  545. true
  546. (>= last-modified (:last-modified %))
  547. false
  548. ;; these special files have higher priority in s1
  549. (contains? higher-priority-remote-files path)
  550. false
  551. (< last-modified (:last-modified %))
  552. true)
  553. s2)
  554. result
  555. (conj result item))))
  556. #{} s1))
  557. (comment
  558. (defn map->FileMetadata [m]
  559. (apply ->FileMetadata ((juxt :size :etag :path :encrypted-path :last-modified :remote? (constantly nil)) m)))
  560. (assert
  561. (=
  562. #{(map->FileMetadata {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2})}
  563. (diff-file-metadata-sets
  564. (into #{}
  565. (map map->FileMetadata)
  566. [{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1}
  567. {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2}])
  568. (into #{}
  569. (map map->FileMetadata)
  570. [{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1}
  571. {:size 1 :etag 1 :path 2 :encrypted-path 2 :last-modified 1}])))))
  572. (extend-protocol IChecksum
  573. FileMetadata
  574. (-checksum [this] (.-etag this))
  575. FileTxn
  576. (-checksum [this] (.-checksum this)))
  577. (defn- sort-file-metadata-fn
  578. ":recent-days-range > :favorite-pages > small-size pages > ...
  579. :recent-days-range : [<min-inst-ms> <max-inst-ms>]
  580. "
  581. [& {:keys [recent-days-range favorite-pages]}]
  582. {:pre [(or (nil? recent-days-range)
  583. (every? number? recent-days-range))]}
  584. (let [favorite-pages* (set favorite-pages)]
  585. (fn [^FileMetadata item]
  586. (let [path (relative-path item)
  587. journal? (string/starts-with? path
  588. (str (config/get-journals-directory) "/"))
  589. journal-day
  590. (when journal?
  591. (try
  592. (tc/to-long
  593. (tf/parse (tf/formatter "yyyy_MM_dd")
  594. (-> path
  595. (string/replace-first "journals/" "")
  596. (string/replace-first ".md" ""))))
  597. (catch :default _)))]
  598. (cond
  599. (and recent-days-range
  600. journal-day
  601. (<= (first recent-days-range)
  602. ^number journal-day
  603. (second recent-days-range)))
  604. journal-day
  605. (string/includes? path "logseq/")
  606. 9999
  607. (string/includes? path "content.")
  608. 10000
  609. (contains? favorite-pages* path)
  610. (count path)
  611. :else
  612. (- (.-size item)))))))
  613. ;;; ### APIs
  614. ;; `RSAPI` call apis through rsapi package, supports operations on files
  615. (defprotocol IRSAPI
  616. (rsapi-ready? [this graph-uuid] "return true when rsapi ready")
  617. (<key-gen [this] "generate public+private keys")
  618. (<set-env [this graph-uuid prod? private-key public-key] "set environment")
  619. (<get-local-files-meta [this graph-uuid base-path filepaths] "get local files' metadata")
  620. (<get-local-all-files-meta [this graph-uuid base-path] "get all local files' metadata")
  621. (<rename-local-file [this graph-uuid base-path from to])
  622. (<update-local-files [this graph-uuid base-path filepaths] "remote -> local")
  623. (<download-version-files [this graph-uuid base-path filepaths])
  624. (<delete-local-files [this graph-uuid base-path filepaths])
  625. (<update-remote-files [this graph-uuid base-path filepaths local-txid] "local -> remote, return err or txid")
  626. (<delete-remote-files [this graph-uuid base-path filepaths local-txid] "return err or txid")
  627. (<encrypt-fnames [this graph-uuid fnames])
  628. (<decrypt-fnames [this graph-uuid fnames])
  629. (<cancel-all-requests [this])
  630. (<add-new-version [this repo path content]))
  631. (defprotocol IRemoteAPI
  632. (<user-info [this] "user info")
  633. (<get-remote-all-files-meta [this graph-uuid] "get all remote files' metadata")
  634. (<get-remote-files-meta [this graph-uuid filepaths] "get remote files' metadata")
  635. (<get-remote-graph [this graph-name-opt graph-uuid-opt] "get graph info by GRAPH-NAME-OPT or GRAPH-UUID-OPT")
  636. (<get-remote-file-versions [this graph-uuid filepath] "get file's version list")
  637. (<list-remote-graphs [this] "list all remote graphs")
  638. (<get-deletion-logs [this graph-uuid from-txid] "get deletion logs from FROM-TXID")
  639. (<get-diff [this graph-uuid from-txid] "get diff from FROM-TXID, return [txns, latest-txid, min-txid]")
  640. (<create-graph [this graph-name] "create graph")
  641. (<delete-graph [this graph-uuid] "delete graph")
  642. (<get-graph-salt [this graph-uuid] "return httpcode 410 when salt expired")
  643. (<create-graph-salt [this graph-uuid] "return httpcode 409 when salt already exists and not expired yet")
  644. (<get-graph-encrypt-keys [this graph-uuid])
  645. (<upload-graph-encrypt-keys [this graph-uuid public-key encrypted-private-key]))
  646. (defprotocol IToken
  647. (<get-token [this]))
  648. (defn <case-different-local-file-exist?
  649. "e.g. filepath=\"pages/Foo.md\"
  650. found-filepath=\"pages/foo.md\"
  651. it happens on macos (case-insensitive fs)
  652. return canonicalized filepath if exists"
  653. [graph-uuid irsapi base-path filepath]
  654. (go
  655. (let [r (<! (<get-local-files-meta irsapi graph-uuid base-path [filepath]))]
  656. (when (some-> r first :path (not= filepath))
  657. (-> r first :path)))))
  658. (defn <local-file-not-exist?
  659. [graph-uuid irsapi base-path filepath]
  660. (go
  661. (let [r (<! (<get-local-files-meta irsapi graph-uuid base-path [filepath]))]
  662. (or
  663. ;; not found at all
  664. (empty? r)
  665. ;; or,
  666. ;; e.g. filepath="pages/Foo.md"
  667. ;; found-filepath="pages/foo.md"
  668. ;; it happens on macos (case-insensitive fs)
  669. (not= filepath (:path (first r)))))))
  670. (defn- <retry-rsapi [f]
  671. (go-loop [n 3]
  672. (let [r (<! (f))]
  673. (if (and (instance? ExceptionInfo r)
  674. (string/index-of (str (ex-cause r)) "operation timed out")
  675. (> n 0))
  676. (do
  677. (print (str "retry(" n ") ..."))
  678. (recur (dec n)))
  679. r))))
  680. (declare <rsapi-cancel-all-requests)
  681. (deftype RSAPI [^:mutable graph-uuid' ^:mutable private-key' ^:mutable public-key']
  682. IToken
  683. (<get-token [_this]
  684. (user/<wrap-ensure-id&access-token
  685. (state/get-auth-id-token)))
  686. IRSAPI
  687. (rsapi-ready? [_ graph-uuid] (and (= graph-uuid graph-uuid') private-key' public-key'))
  688. (<key-gen [_] (go (js->clj (<! (p->c (ipc/ipc "key-gen")))
  689. :keywordize-keys true)))
  690. (<set-env [_ graph-uuid prod? private-key public-key]
  691. (when (not-empty private-key)
  692. (print (util/format "[%s] setting sync age-encryption passphrase..." graph-uuid)))
  693. (set! graph-uuid' graph-uuid)
  694. (set! private-key' private-key)
  695. (set! public-key' public-key)
  696. (p->c (ipc/ipc "set-env" graph-uuid (if prod? "prod" "dev") private-key public-key)))
  697. (<get-local-all-files-meta [_ graph-uuid base-path]
  698. (go
  699. (let [r (<! (<retry-rsapi #(p->c (ipc/ipc "get-local-all-files-meta" graph-uuid base-path))))]
  700. (if (instance? ExceptionInfo r)
  701. r
  702. (->> r
  703. js->clj
  704. (map (fn [[path metadata]]
  705. (->FileMetadata (get metadata "size") (get metadata "md5") (gp-util/path-normalize path)
  706. (get metadata "encryptedFname") (get metadata "mtime") false nil)))
  707. set)))))
  708. (<get-local-files-meta [_ graph-uuid base-path filepaths]
  709. (go
  710. (let [r (<! (<retry-rsapi #(p->c (ipc/ipc "get-local-files-meta" graph-uuid base-path filepaths))))]
  711. (assert (not (instance? ExceptionInfo r)) "get-local-files-meta shouldn't return exception")
  712. (->> r
  713. js->clj
  714. (map (fn [[path metadata]]
  715. (->FileMetadata (get metadata "size") (get metadata "md5") (gp-util/path-normalize path)
  716. (get metadata "encryptedFname") (get metadata "mtime") false nil)))))))
  717. (<rename-local-file [_ graph-uuid base-path from to]
  718. (<retry-rsapi #(p->c (ipc/ipc "rename-local-file" graph-uuid base-path
  719. (gp-util/path-normalize from)
  720. (gp-util/path-normalize to)))))
  721. (<update-local-files [this graph-uuid base-path filepaths]
  722. (println "update-local-files" graph-uuid base-path filepaths)
  723. (go
  724. (<! (<rsapi-cancel-all-requests))
  725. (let [token (<! (<get-token this))]
  726. (<! (p->c (ipc/ipc "update-local-files" graph-uuid base-path filepaths token))))))
  727. (<download-version-files [this graph-uuid base-path filepaths]
  728. (go
  729. (let [token (<! (<get-token this))
  730. r (<! (<retry-rsapi
  731. #(p->c (ipc/ipc "download-version-files" graph-uuid base-path filepaths token))))]
  732. r)))
  733. (<delete-local-files [_ graph-uuid base-path filepaths]
  734. (let [normalized-filepaths (mapv gp-util/path-normalize filepaths)]
  735. (go
  736. (println "delete-local-files" filepaths)
  737. (let [r (<! (<retry-rsapi #(p->c (ipc/ipc "delete-local-files" graph-uuid base-path normalized-filepaths))))]
  738. r))))
  739. (<update-remote-files [this graph-uuid base-path filepaths local-txid]
  740. (let [normalized-filepaths (mapv gp-util/path-normalize filepaths)]
  741. (go
  742. (<! (<rsapi-cancel-all-requests))
  743. (let [token (<! (<get-token this))]
  744. (<! (<retry-rsapi
  745. #(p->c (ipc/ipc "update-remote-files" graph-uuid base-path normalized-filepaths local-txid token))))))))
  746. (<delete-remote-files [this graph-uuid base-path filepaths local-txid]
  747. (let [normalized-filepaths (mapv gp-util/path-normalize filepaths)]
  748. (go
  749. (let [token (<! (<get-token this))]
  750. (<!
  751. (<retry-rsapi
  752. #(p->c (ipc/ipc "delete-remote-files" graph-uuid base-path normalized-filepaths local-txid token))))))))
  753. (<encrypt-fnames [_ graph-uuid fnames] (go (js->clj (<! (p->c (ipc/ipc "encrypt-fnames" graph-uuid fnames))))))
  754. (<decrypt-fnames [_ graph-uuid fnames] (go
  755. (let [r (<! (p->c (ipc/ipc "decrypt-fnames" graph-uuid fnames)))]
  756. (if (instance? ExceptionInfo r)
  757. (ex-info "decrypt-failed" {:fnames fnames} (ex-cause r))
  758. (js->clj r)))))
  759. (<cancel-all-requests [_]
  760. (p->c (ipc/ipc "cancel-all-requests")))
  761. (<add-new-version [_this repo path content]
  762. (p->c (ipc/ipc "addVersionFile" (config/get-local-dir repo) path content))))
  763. (deftype ^:large-vars/cleanup-todo CapacitorAPI [^:mutable graph-uuid' ^:mutable private-key ^:mutable public-key']
  764. IToken
  765. (<get-token [_this]
  766. (user/<wrap-ensure-id&access-token
  767. (state/get-auth-id-token)))
  768. IRSAPI
  769. (rsapi-ready? [_ graph-uuid] (and (= graph-uuid graph-uuid') private-key public-key'))
  770. (<key-gen [_]
  771. (go (let [r (<! (p->c (.keygen mobile-util/file-sync #js {})))]
  772. (-> r
  773. (js->clj :keywordize-keys true)))))
  774. (<set-env [_ graph-uuid prod? secret-key public-key]
  775. (set! graph-uuid' graph-uuid)
  776. (set! private-key secret-key)
  777. (set! public-key' public-key)
  778. (p->c (.setEnv mobile-util/file-sync (clj->js {:graphUUID graph-uuid
  779. :env (if prod? "prod" "dev")
  780. :secretKey secret-key
  781. :publicKey public-key}))))
  782. (<get-local-all-files-meta [_ graph-uuid base-path]
  783. (go
  784. (let [r (<! (p->c (.getLocalAllFilesMeta mobile-util/file-sync (clj->js {:graphUUID graph-uuid
  785. :basePath base-path}))))]
  786. (if (instance? ExceptionInfo r)
  787. r
  788. (->> (.-result r)
  789. js->clj
  790. (map (fn [[path metadata]]
  791. (->FileMetadata (get metadata "size") (get metadata "md5") (capacitor-fs/normalize-file-protocol-path nil path)
  792. (get metadata "encryptedFname") (get metadata "mtime") false nil)))
  793. set)))))
  794. (<get-local-files-meta [_ graph-uuid base-path filepaths]
  795. (go
  796. (let [r (<! (p->c (.getLocalFilesMeta mobile-util/file-sync
  797. (clj->js {:graphUUID graph-uuid
  798. :basePath base-path
  799. :filePaths filepaths}))))]
  800. (assert (not (instance? ExceptionInfo r)) "get-local-files-meta shouldn't return exception")
  801. (->> (.-result r)
  802. js->clj
  803. (map (fn [[path metadata]]
  804. (->FileMetadata (get metadata "size") (get metadata "md5") (capacitor-fs/normalize-file-protocol-path nil path)
  805. (get metadata "encryptedFname") (get metadata "mtime") false nil)))
  806. set))))
  807. (<rename-local-file [_ graph-uuid base-path from to]
  808. (p->c (.renameLocalFile mobile-util/file-sync
  809. (clj->js {:graphUUID graph-uuid
  810. :basePath base-path
  811. :from (capacitor-fs/normalize-file-protocol-path nil from)
  812. :to (capacitor-fs/normalize-file-protocol-path nil to)}))))
  813. (<update-local-files [this graph-uuid base-path filepaths]
  814. (go
  815. (let [token (<! (<get-token this))
  816. filepaths' (map #(capacitor-fs/normalize-file-protocol-path % nil :normalize? false) filepaths)]
  817. (<! (p->c (.updateLocalFiles mobile-util/file-sync (clj->js {:graphUUID graph-uuid
  818. :basePath base-path
  819. :filePaths filepaths'
  820. :token token})))))))
  821. (<download-version-files [this graph-uuid base-path filepaths]
  822. (go
  823. (let [token (<! (<get-token this))
  824. r (<! (<retry-rsapi
  825. #(p->c (.updateLocalVersionFiles mobile-util/file-sync
  826. (clj->js {:graphUUID graph-uuid
  827. :basePath base-path
  828. :filePaths filepaths
  829. :token token})))))]
  830. r)))
  831. (<delete-local-files [_ graph-uuid base-path filepaths]
  832. (let [normalized-filepaths (mapv #(capacitor-fs/normalize-file-protocol-path nil %) filepaths)]
  833. (go
  834. (let [r (<! (<retry-rsapi #(p->c (.deleteLocalFiles mobile-util/file-sync
  835. (clj->js {:graphUUID graph-uuid
  836. :basePath base-path
  837. :filePaths normalized-filepaths})))))]
  838. r))))
  839. (<update-remote-files [this graph-uuid base-path filepaths local-txid]
  840. (let [normalized-filepaths (mapv #(capacitor-fs/normalize-file-protocol-path nil %) filepaths)]
  841. (go
  842. (let [token (<! (<get-token this))
  843. r (<! (p->c (.updateRemoteFiles mobile-util/file-sync
  844. (clj->js {:graphUUID graph-uuid
  845. :basePath base-path
  846. :filePaths normalized-filepaths
  847. :txid local-txid
  848. :token token
  849. :fnameEncryption true}))))]
  850. (if (instance? ExceptionInfo r)
  851. r
  852. (get (js->clj r) "txid"))))))
  853. (<delete-remote-files [this graph-uuid _base-path filepaths local-txid]
  854. (let [normalized-filepaths (mapv #(capacitor-fs/normalize-file-protocol-path nil %) filepaths)]
  855. (go
  856. (let [token (<! (<get-token this))
  857. r (<! (p->c (.deleteRemoteFiles mobile-util/file-sync
  858. (clj->js {:graphUUID graph-uuid
  859. :filePaths normalized-filepaths
  860. :txid local-txid
  861. :token token}))))]
  862. (if (instance? ExceptionInfo r)
  863. r
  864. (get (js->clj r) "txid"))))))
  865. (<encrypt-fnames [_ graph-uuid fnames]
  866. (go
  867. (let [r (<! (p->c (.encryptFnames mobile-util/file-sync
  868. (clj->js {:graphUUID graph-uuid
  869. :filePaths fnames}))))]
  870. (if (instance? ExceptionInfo r)
  871. (.-cause r)
  872. (get (js->clj r) "value")))))
  873. (<decrypt-fnames [_ graph-uuid fnames]
  874. (go (let [r (<! (p->c (.decryptFnames mobile-util/file-sync
  875. (clj->js {:graphUUID graph-uuid
  876. :filePaths fnames}))))]
  877. (if (instance? ExceptionInfo r)
  878. (ex-info "decrypt-failed" {:fnames fnames} (ex-cause r))
  879. (get (js->clj r) "value")))))
  880. (<cancel-all-requests [_]
  881. (p->c (.cancelAllRequests mobile-util/file-sync)))
  882. (<add-new-version [_this repo path content]
  883. (p->c (capacitor-fs/backup-file repo :version-file-dir path content))))
  884. (def rsapi (cond
  885. (util/electron?)
  886. (->RSAPI nil nil nil)
  887. (mobile-util/native-ios?)
  888. (->CapacitorAPI nil nil nil)
  889. (mobile-util/native-android?)
  890. (->CapacitorAPI nil nil nil)
  891. :else
  892. nil))
  893. (defn add-new-version-file
  894. [repo path content]
  895. (<add-new-version rsapi repo path content))
  896. (defn <rsapi-cancel-all-requests []
  897. (go
  898. (when rsapi
  899. (<! (<cancel-all-requests rsapi)))))
  900. ;;; ### remote & rs api exceptions
  901. (defn sync-stop-when-api-flying?
  902. [exp]
  903. (some-> (ex-data exp) :err (= :stop)))
  904. (defn storage-exceed-limit?
  905. [exp]
  906. (some->> (ex-data exp)
  907. :err
  908. ((juxt :status (comp :message :body)))
  909. ((fn [[status msg]] (and (= 403 status) (= msg "storage-limit"))))))
  910. (defn graph-count-exceed-limit?
  911. [exp]
  912. (some->> (ex-data exp)
  913. :err
  914. ((juxt :status (comp :message :body)))
  915. ((fn [[status msg]] (and (= 403 status) (= msg "graph-count-exceed-limit"))))))
  916. (defn decrypt-exp?
  917. [exp]
  918. (some-> exp ex-message #(= % "decrypt-failed")))
  919. ;;; remote api exceptions ends
  920. ;;; ### sync events
  921. ;; "`SyncManager` will put some internal sync events to this chan.
  922. ;; see also spec `::sync-event`"
  923. (defonce ^:private sync-events-chan
  924. (chan (async/sliding-buffer 1000)))
  925. ;; see also spec `::event` for topic list
  926. (defonce sync-events-publication
  927. (async/pub sync-events-chan :event))
  928. (defn- put-sync-event!
  929. [val]
  930. {:pre [(s/valid? ::sync-event val)]}
  931. (async/put! sync-events-chan val))
  932. (def ^:private debug-print-sync-events-loop-stop-chan (chan 1))
  933. (defn debug-print-sync-events-loop
  934. ([] (debug-print-sync-events-loop [:created-local-version-file
  935. :finished-local->remote
  936. :finished-remote->local
  937. :pause
  938. :resume
  939. :exception-decrypt-failed
  940. :remote->local-full-sync-failed
  941. :local->remote-full-sync-failed]))
  942. ([topics]
  943. (util/drain-chan debug-print-sync-events-loop-stop-chan)
  944. (let [topic&chs (map (juxt identity #(chan 10)) topics)
  945. out-ch (chan 10)
  946. out-mix (async/mix out-ch)]
  947. (doseq [[topic ch] topic&chs]
  948. (async/sub sync-events-publication topic ch)
  949. (async/admix out-mix ch))
  950. (go-loop []
  951. (let [{:keys [val stop]}
  952. (async/alt!
  953. debug-print-sync-events-loop-stop-chan {:stop true}
  954. out-ch ([v] {:val v}))]
  955. (cond
  956. stop (do (async/unmix-all out-mix)
  957. (doseq [[topic ch] topic&chs]
  958. (async/unsub sync-events-publication topic ch)))
  959. val (do (pp/pprint [:debug :sync-event val])
  960. (recur))))))))
  961. (defn stop-debug-print-sync-events-loop
  962. []
  963. (offer! debug-print-sync-events-loop-stop-chan true))
  964. (comment
  965. ;; sub one type event example:
  966. (def c1 (chan 10))
  967. (async/sub sync-events-publication :created-local-version-file c1)
  968. (offer! sync-events-chan {:event :created-local-version-file :data :xxx})
  969. (poll! c1)
  970. ;; sub multiple type events example:
  971. ;; sub :created-local-version-file and :finished-remote->local events,
  972. ;; output into channel c4-out
  973. (def c2 (chan 10))
  974. (def c3 (chan 10))
  975. (def c4-out (chan 10))
  976. (def mix-out (async/mix c4-out))
  977. (async/admix mix-out c2)
  978. (async/admix mix-out c3)
  979. (async/sub sync-events-publication :created-local-version-file c2)
  980. (async/sub sync-events-publication :finished-remote->local c3)
  981. (offer! sync-events-chan {:event :created-local-version-file :data :xxx})
  982. (offer! sync-events-chan {:event :finished-remote->local :data :xxx})
  983. (poll! c4-out)
  984. (poll! c4-out)
  985. )
  986. ;;; sync events ends
  987. (defn- fire-file-sync-storage-exceed-limit-event!
  988. [exp]
  989. (when (storage-exceed-limit? exp)
  990. (state/pub-event! [:file-sync/storage-exceed-limit])
  991. true))
  992. (defn- fire-file-sync-graph-count-exceed-limit-event!
  993. [exp]
  994. (when (graph-count-exceed-limit? exp)
  995. (state/pub-event! [:file-sync/graph-count-exceed-limit])
  996. true))
  997. (deftype RemoteAPI [*stopped?]
  998. Object
  999. (<request [this api-name body]
  1000. (go
  1001. (let [resp (<! (<request api-name body (<! (<get-token this)) *stopped?))]
  1002. (if (http/unexceptional-status? (:status resp))
  1003. (get-resp-json-body resp)
  1004. (let [exp (ex-info "request failed"
  1005. {:err resp
  1006. :body (get-resp-json-body resp)
  1007. :api-name api-name
  1008. :request-body body})]
  1009. (fire-file-sync-storage-exceed-limit-event! exp)
  1010. (fire-file-sync-graph-count-exceed-limit-event! exp)
  1011. exp)))))
  1012. ;; for test
  1013. (update-files [this graph-uuid txid files]
  1014. {:pre [(map? files)
  1015. (number? txid)]}
  1016. (.<request this "update_files" {:GraphUUID graph-uuid :TXId txid :Files files}))
  1017. IToken
  1018. (<get-token [_this]
  1019. (user/<wrap-ensure-id&access-token
  1020. (state/get-auth-id-token))))
  1021. (extend-type RemoteAPI
  1022. IRemoteAPI
  1023. (<user-info [this]
  1024. (user/<wrap-ensure-id&access-token
  1025. (<! (.<request this "user_info" {}))))
  1026. (<get-remote-all-files-meta [this graph-uuid]
  1027. (user/<wrap-ensure-id&access-token
  1028. (let [file-meta-list (transient #{})
  1029. encrypted-path-list (transient [])
  1030. exp-r
  1031. (<!
  1032. (go-loop [continuation-token nil]
  1033. (let [r (<! (.<request this "get_all_files"
  1034. (into
  1035. {}
  1036. (remove (comp nil? second)
  1037. {:GraphUUID graph-uuid :ContinuationToken continuation-token}))))]
  1038. (if (instance? ExceptionInfo r)
  1039. r
  1040. (let [next-continuation-token (:NextContinuationToken r)
  1041. objs (:Objects r)]
  1042. (apply conj! encrypted-path-list (map (comp remove-user-graph-uuid-prefix :Key) objs))
  1043. (apply conj! file-meta-list
  1044. (map
  1045. #(hash-map :checksum (:checksum %)
  1046. :encrypted-path (remove-user-graph-uuid-prefix (:Key %))
  1047. :size (:Size %)
  1048. :last-modified (:LastModified %))
  1049. objs))
  1050. (when-not (empty? next-continuation-token)
  1051. (recur next-continuation-token)))))))]
  1052. (if (instance? ExceptionInfo exp-r)
  1053. exp-r
  1054. (let [file-meta-list* (persistent! file-meta-list)
  1055. encrypted-path-list* (persistent! encrypted-path-list)
  1056. path-list-or-exp (<! (<decrypt-fnames rsapi graph-uuid encrypted-path-list*))]
  1057. (if (instance? ExceptionInfo path-list-or-exp)
  1058. path-list-or-exp
  1059. (let [encrypted-path->path-map (zipmap encrypted-path-list*
  1060. (mapv
  1061. #(capacitor-fs/normalize-file-protocol-path nil %)
  1062. path-list-or-exp))]
  1063. (set
  1064. (mapv
  1065. #(->FileMetadata (:size %)
  1066. (:checksum %)
  1067. (get encrypted-path->path-map (:encrypted-path %))
  1068. (:encrypted-path %)
  1069. (:last-modified %)
  1070. true nil)
  1071. file-meta-list*)))))))))
  1072. (<get-remote-files-meta [this graph-uuid filepaths]
  1073. {:pre [(coll? filepaths)]}
  1074. (user/<wrap-ensure-id&access-token
  1075. (let [encrypted-paths* (<! (<encrypt-fnames rsapi graph-uuid filepaths))
  1076. r (<! (.<request this "get_files_meta" {:GraphUUID graph-uuid :Files encrypted-paths*}))]
  1077. (if (instance? ExceptionInfo r)
  1078. r
  1079. (let [encrypted-paths (mapv :FilePath r)
  1080. paths-or-exp (<! (<decrypt-fnames rsapi graph-uuid encrypted-paths))]
  1081. (if (instance? ExceptionInfo paths-or-exp)
  1082. paths-or-exp
  1083. (let [encrypted-path->path-map (zipmap encrypted-paths paths-or-exp)]
  1084. (into #{}
  1085. (comp
  1086. (filter #(not= "filepath too long" (:Error %)))
  1087. (map #(->FileMetadata (:Size %)
  1088. (:Checksum %)
  1089. (some->> (get encrypted-path->path-map (:FilePath %))
  1090. (capacitor-fs/normalize-file-protocol-path nil ))
  1091. (:FilePath %)
  1092. (:LastModified %)
  1093. true nil)))
  1094. r))))))))
  1095. (<get-remote-graph [this graph-name-opt graph-uuid-opt]
  1096. {:pre [(or graph-name-opt graph-uuid-opt)]}
  1097. (user/<wrap-ensure-id&access-token
  1098. (<! (.<request this "get_graph" (cond-> {}
  1099. (seq graph-name-opt)
  1100. (assoc :GraphName graph-name-opt)
  1101. (seq graph-uuid-opt)
  1102. (assoc :GraphUUID graph-uuid-opt))))))
  1103. (<get-remote-file-versions [this graph-uuid filepath]
  1104. (user/<wrap-ensure-id&access-token
  1105. (let [encrypted-path (first (<! (<encrypt-fnames rsapi graph-uuid [filepath])))]
  1106. (<! (.<request this "get_file_version_list" {:GraphUUID graph-uuid :File encrypted-path})))))
  1107. (<list-remote-graphs [this]
  1108. (user/<wrap-ensure-id&access-token
  1109. (<! (.<request this "list_graphs"))))
  1110. (<get-deletion-logs [this graph-uuid from-txid]
  1111. (user/<wrap-ensure-id&access-token
  1112. (let [r (<! (.<request this "get_deletion_log" {:GraphUUID graph-uuid :FromTXId from-txid}))]
  1113. (if (instance? ExceptionInfo r)
  1114. r
  1115. (let [txns-with-encrypted-paths (mapv #(update % :path remove-user-graph-uuid-prefix) (:Transactions r))
  1116. encrypted-paths (mapv :path txns-with-encrypted-paths)
  1117. encrypted-path->path-map
  1118. (zipmap
  1119. encrypted-paths
  1120. (<! (<decrypt-fnames rsapi graph-uuid encrypted-paths)))
  1121. txns
  1122. (mapv
  1123. (fn [txn] (update txn :path #(get encrypted-path->path-map %)))
  1124. txns-with-encrypted-paths)]
  1125. txns)))))
  1126. (<get-diff [this graph-uuid from-txid]
  1127. ;; TODO: path in transactions should be relative path(now s3 key, which includes graph-uuid and user-uuid)
  1128. (user/<wrap-ensure-id&access-token
  1129. (let [r (<! (.<request this "get_diff" {:GraphUUID graph-uuid :FromTXId from-txid}))]
  1130. (if (instance? ExceptionInfo r)
  1131. r
  1132. (let [txns-with-encrypted-paths (sort-by :TXId (:Transactions r))
  1133. txns-with-encrypted-paths*
  1134. (mapv
  1135. (fn [txn]
  1136. (assoc txn :TXContent
  1137. (mapv
  1138. (fn [[to-path from-path checksum]]
  1139. [(remove-user-graph-uuid-prefix to-path)
  1140. (some-> from-path remove-user-graph-uuid-prefix)
  1141. checksum])
  1142. (:TXContent txn))))
  1143. txns-with-encrypted-paths)
  1144. encrypted-paths
  1145. (mapcat
  1146. (fn [txn]
  1147. (remove
  1148. #(or (nil? %) (not (string/starts-with? % "e.")))
  1149. (mapcat
  1150. (fn [[to-path from-path _checksum]] [to-path from-path])
  1151. (:TXContent txn))))
  1152. txns-with-encrypted-paths*)
  1153. encrypted-path->path-map
  1154. (zipmap
  1155. encrypted-paths
  1156. (<! (<decrypt-fnames rsapi graph-uuid encrypted-paths)))
  1157. txns
  1158. (mapv
  1159. (fn [txn]
  1160. (assoc
  1161. txn :TXContent
  1162. (mapv
  1163. (fn [[to-path from-path checksum]]
  1164. [(get encrypted-path->path-map to-path to-path)
  1165. (some->> from-path (get encrypted-path->path-map))
  1166. checksum])
  1167. (:TXContent txn))))
  1168. txns-with-encrypted-paths*)]
  1169. [txns
  1170. (:TXId (last txns))
  1171. (:TXId (first txns))])))))
  1172. (<create-graph [this graph-name]
  1173. (user/<wrap-ensure-id&access-token
  1174. (<! (.<request this "create_graph" {:GraphName graph-name}))))
  1175. (<delete-graph [this graph-uuid]
  1176. (user/<wrap-ensure-id&access-token
  1177. (<! (.<request this "delete_graph" {:GraphUUID graph-uuid}))))
  1178. (<get-graph-salt [this graph-uuid]
  1179. (user/<wrap-ensure-id&access-token
  1180. (<! (.<request this "get_graph_salt" {:GraphUUID graph-uuid}))))
  1181. (<create-graph-salt [this graph-uuid]
  1182. (user/<wrap-ensure-id&access-token
  1183. (<! (.<request this "create_graph_salt" {:GraphUUID graph-uuid}))))
  1184. (<get-graph-encrypt-keys [this graph-uuid]
  1185. (user/<wrap-ensure-id&access-token
  1186. (<! (.<request this "get_graph_encrypt_keys" {:GraphUUID graph-uuid}))))
  1187. (<upload-graph-encrypt-keys [this graph-uuid public-key encrypted-private-key]
  1188. (user/<wrap-ensure-id&access-token
  1189. (<! (.<request this "upload_graph_encrypt_keys" {:GraphUUID graph-uuid
  1190. :public-key public-key
  1191. :encrypted-private-key encrypted-private-key})))))
  1192. (def remoteapi (->RemoteAPI nil))
  1193. (def ^:private *get-graph-salt-memoize-cache (atom {}))
  1194. (defn update-graph-salt-cache [graph-uuid v]
  1195. {:pre [(map? v)
  1196. (= #{:value :expired-at} (set (keys v)))]}
  1197. (swap! *get-graph-salt-memoize-cache conj [graph-uuid v]))
  1198. (defn <get-graph-salt-memoize [remoteapi graph-uuid]
  1199. (go
  1200. (let [r (get @*get-graph-salt-memoize-cache graph-uuid)
  1201. expired-at (:expired-at r)
  1202. now (tc/to-long (t/now))]
  1203. (if (< now expired-at)
  1204. r
  1205. (let [r (<! (<get-graph-salt remoteapi graph-uuid))]
  1206. (swap! *get-graph-salt-memoize-cache conj [graph-uuid r])
  1207. r)))))
  1208. (def ^:private *get-graph-encrypt-keys-memoize-cache (atom {}))
  1209. (defn update-graph-encrypt-keys-cache [graph-uuid v]
  1210. {:pre [(map? v)
  1211. (= #{:public-key :encrypted-private-key} (set (keys v)))]}
  1212. (swap! *get-graph-encrypt-keys-memoize-cache conj [graph-uuid v]))
  1213. (defn <get-graph-encrypt-keys-memoize [remoteapi graph-uuid]
  1214. (go
  1215. (or (get @*get-graph-encrypt-keys-memoize-cache graph-uuid)
  1216. (let [{:keys [public-key encrypted-private-key] :as r}
  1217. (<! (<get-graph-encrypt-keys remoteapi graph-uuid))]
  1218. (when (and public-key encrypted-private-key)
  1219. (swap! *get-graph-encrypt-keys-memoize-cache conj [graph-uuid r]))
  1220. r))))
  1221. (defn- is-journals-or-pages?
  1222. [filetxn]
  1223. (let [rel-path (relative-path filetxn)]
  1224. (or (string/starts-with? rel-path "journals/")
  1225. (string/starts-with? rel-path "pages/"))))
  1226. (defn- need-add-version-file?
  1227. "when we need to create a new version file:
  1228. 1. when apply a 'update' filetxn, it already exists(same page name) locally and has delete diffs
  1229. 2. when apply a 'delete' filetxn, its origin remote content and local content are different
  1230. - TODO: we need to store origin remote content md5 in server db
  1231. 3. create version files only for files under 'journals/', 'pages/' dir"
  1232. [^FileTxn filetxn origin-db-content]
  1233. (go
  1234. (cond
  1235. (.renamed? filetxn)
  1236. false
  1237. (.-deleted? filetxn)
  1238. false
  1239. (.-updated? filetxn)
  1240. (let [path (relative-path filetxn)
  1241. repo (state/get-current-repo)
  1242. file-path (config/get-file-path repo path)
  1243. content (<! (p->c (fs/read-file "" file-path)))]
  1244. (and (seq origin-db-content)
  1245. (or (nil? content)
  1246. (some :removed (diff/diff origin-db-content content))))))))
  1247. (defn- <with-pause
  1248. [ch *paused]
  1249. (go-loop []
  1250. (if @*paused
  1251. {:pause true}
  1252. (let [{:keys [timeout val]}
  1253. (async/alt! ch ([v] {:val v})
  1254. (timeout 1000) {:timeout true})]
  1255. (cond
  1256. val val
  1257. timeout (recur))))))
  1258. (defn- assert-local-txid<=remote-txid
  1259. []
  1260. (when-let [local-txid (last @graphs-txid)]
  1261. (go (let [remote-txid (:TXId (<! (<get-remote-graph remoteapi nil (second @graphs-txid))))]
  1262. (assert (<= local-txid remote-txid)
  1263. [@graphs-txid local-txid remote-txid])))))
  1264. (defn- get-local-files-checksum
  1265. [graph-uuid base-path relative-paths]
  1266. (go
  1267. (into {}
  1268. (map (juxt #(.-path ^FileMetadata %) #(.-etag ^FileMetadata %)))
  1269. (<! (<get-local-files-meta rsapi graph-uuid base-path relative-paths)))))
  1270. (declare sync-state--add-current-local->remote-files
  1271. sync-state--add-current-remote->local-files
  1272. sync-state--remove-current-local->remote-files
  1273. sync-state--remove-current-remote->local-files
  1274. sync-state--add-recent-remote->local-files
  1275. sync-state--remove-recent-remote->local-files
  1276. sync-state--stopped?)
  1277. (defn- filetxns=>recent-remote->local-files
  1278. [filetxns]
  1279. (let [{:keys [update-filetxns delete-filetxns rename-filetxns]}
  1280. (group-by (fn [^FileTxn e]
  1281. (cond
  1282. (.-updated? e) :update-filetxns
  1283. (.-deleted? e) :delete-filetxns
  1284. (.renamed? e) :rename-filetxns)) filetxns)
  1285. update-file-items (map
  1286. (fn [filetxn]
  1287. (let [path (relative-path filetxn)]
  1288. {:remote->local-type :update
  1289. :checksum (-checksum filetxn)
  1290. :path path}))
  1291. update-filetxns)
  1292. rename-file-items (mapcat
  1293. (fn [^FileTxn filetxn]
  1294. (let [to-path (relative-path filetxn)
  1295. from-path (.-from-path filetxn)]
  1296. [{:remote->local-type :update
  1297. :checksum (-checksum filetxn)
  1298. :path to-path}
  1299. {:remote->local-type :delete
  1300. :checksum nil
  1301. :path from-path}]))
  1302. rename-filetxns)
  1303. delete-file-items (map
  1304. (fn [filetxn]
  1305. (let [path (relative-path filetxn)]
  1306. {:remote->local-type :delete
  1307. :checksum (-checksum filetxn)
  1308. :path path}))
  1309. delete-filetxns)]
  1310. (set (concat update-file-items rename-file-items delete-file-items))))
  1311. (defn- apply-filetxns
  1312. [*sync-state graph-uuid base-path filetxns *paused]
  1313. (go
  1314. (cond
  1315. (.renamed? (first filetxns))
  1316. (let [^FileTxn filetxn (first filetxns)
  1317. from-path (.-from-path filetxn)
  1318. to-path (.-to-path filetxn)]
  1319. (assert (= 1 (count filetxns)))
  1320. (<! (<rename-local-file rsapi graph-uuid base-path
  1321. (relative-path from-path)
  1322. (relative-path to-path))))
  1323. (.-updated? (first filetxns))
  1324. (let [repo (state/get-current-repo)
  1325. txn->db-content-vec (->> filetxns
  1326. (mapv
  1327. #(when (is-journals-or-pages? %)
  1328. [% (db/get-file repo (config/get-file-path repo (relative-path %)))]))
  1329. (remove nil?))]
  1330. (doseq [relative-p (map relative-path filetxns)]
  1331. (when-some [relative-p*
  1332. (<! (<case-different-local-file-exist? graph-uuid rsapi base-path relative-p))]
  1333. (let [recent-remote->local-file-item {:remote->local-type :delete
  1334. :checksum nil
  1335. :path relative-p*}]
  1336. (println :debug "found case-different-same-local-file" relative-p relative-p*)
  1337. (swap! *sync-state sync-state--add-recent-remote->local-files
  1338. [recent-remote->local-file-item])
  1339. (<! (<delete-local-files rsapi graph-uuid base-path [relative-p*]))
  1340. (go (<! (timeout 5000))
  1341. (swap! *sync-state sync-state--remove-recent-remote->local-files
  1342. [recent-remote->local-file-item])))))
  1343. (let [update-local-files-ch (<update-local-files rsapi graph-uuid base-path (map relative-path filetxns))
  1344. r (<! (<with-pause update-local-files-ch *paused))]
  1345. (doseq [[filetxn origin-db-content] txn->db-content-vec]
  1346. (when (<! (need-add-version-file? filetxn origin-db-content))
  1347. (<! (<add-new-version rsapi repo (relative-path filetxn) origin-db-content))
  1348. (put-sync-event! {:event :created-local-version-file
  1349. :data {:graph-uuid graph-uuid
  1350. :repo repo
  1351. :path (relative-path filetxn)
  1352. :epoch (tc/to-epoch (t/now))}})))
  1353. r))
  1354. (.-deleted? (first filetxns))
  1355. (let [filetxn (first filetxns)]
  1356. (assert (= 1 (count filetxns)))
  1357. (if (<! (<local-file-not-exist? graph-uuid rsapi base-path (relative-path filetxn)))
  1358. ;; not exist, ignore
  1359. true
  1360. (let [r (<! (<delete-local-files rsapi graph-uuid base-path [(relative-path filetxn)]))]
  1361. (if (and (instance? ExceptionInfo r)
  1362. (string/index-of (str (ex-cause r)) "No such file or directory"))
  1363. true
  1364. r)))))))
  1365. (declare sync-state-reset-full-remote->local-files)
  1366. (defn apply-filetxns-partitions
  1367. "won't call <update-graphs-txid! when *txid is nil"
  1368. [*sync-state user-uuid graph-uuid base-path filetxns-partitions repo *txid *stopped *paused full-sync?]
  1369. (assert (some? *sync-state))
  1370. (go-loop [filetxns-partitions* filetxns-partitions]
  1371. (cond
  1372. @*stopped {:stop true}
  1373. @*paused {:pause true}
  1374. :else
  1375. (when (seq filetxns-partitions*)
  1376. (let [filetxns (first filetxns-partitions*)
  1377. paths (map relative-path filetxns)
  1378. recent-remote->local-file-items (filetxns=>recent-remote->local-files filetxns)
  1379. _ (when-not full-sync?
  1380. (swap! *sync-state #(sync-state-reset-full-remote->local-files % recent-remote->local-file-items)))
  1381. ;; update recent-remote->local-files
  1382. _ (swap! *sync-state sync-state--add-recent-remote->local-files
  1383. recent-remote->local-file-items)
  1384. _ (swap! *sync-state sync-state--add-current-remote->local-files paths)
  1385. r (<! (apply-filetxns *sync-state graph-uuid base-path filetxns *paused))
  1386. _ (swap! *sync-state sync-state--remove-current-remote->local-files paths
  1387. (not (instance? ExceptionInfo r)))]
  1388. ;; remove these recent-remote->local-file-items 5s later
  1389. (go (<! (timeout 5000))
  1390. (swap! *sync-state sync-state--remove-recent-remote->local-files
  1391. recent-remote->local-file-items))
  1392. (cond
  1393. (instance? ExceptionInfo r) r
  1394. @*paused {:pause true}
  1395. :else
  1396. (let [latest-txid (apply max (map #(.-txid ^FileTxn %) filetxns))]
  1397. ;; update local-txid
  1398. (when (and *txid (number? latest-txid))
  1399. (reset! *txid latest-txid)
  1400. (<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo)))
  1401. (recur (next filetxns-partitions*)))))))))
  1402. (defmulti need-sync-remote? (fn [v] (cond
  1403. (= :max v)
  1404. :max
  1405. (and (vector? v) (number? (first v)))
  1406. :txid
  1407. (instance? ExceptionInfo v)
  1408. :exceptional-response
  1409. (instance? cljs.core.async.impl.channels/ManyToManyChannel v)
  1410. :chan)))
  1411. (defmethod need-sync-remote? :max [_] true)
  1412. (defmethod need-sync-remote? :txid [[txid remote->local-syncer]]
  1413. (let [remote-txid txid
  1414. local-txid (.-txid remote->local-syncer)]
  1415. (or (nil? local-txid)
  1416. (> remote-txid local-txid))))
  1417. (defmethod need-sync-remote? :exceptional-response [resp]
  1418. (let [data (ex-data resp)
  1419. cause (ex-cause resp)]
  1420. (or
  1421. (and (= (:error data) :promise-error)
  1422. (when-let [r (re-find #"(\d+), txid_to_validate = (\d+)" (str cause))]
  1423. (> (nth r 1) (nth r 2))))
  1424. (= 409 (get-in data [:err :status])))))
  1425. (defmethod need-sync-remote? :chan [c]
  1426. (go (need-sync-remote? (<! c))))
  1427. (defmethod need-sync-remote? :default [_] false)
  1428. (defn- need-reset-local-txid?
  1429. [r]
  1430. (when-let [cause (ex-cause r)]
  1431. (when-let [r (re-find #"(\d+), txid_to_validate = (\d+)" (str cause))]
  1432. (< (nth r 1) (nth r 2)))))
  1433. (defn- graph-has-been-deleted?
  1434. [r]
  1435. (some->> (ex-cause r) str (re-find #"graph-not-exist")))
  1436. ;; type = "change" | "add" | "unlink"
  1437. (deftype FileChangeEvent [type dir path stat checksum]
  1438. IRelativePath
  1439. (-relative-path [_] (remove-dir-prefix dir path))
  1440. IEquiv
  1441. (-equiv [_ ^FileChangeEvent other]
  1442. (and (= dir (.-dir other))
  1443. (= type (.-type other))
  1444. (= path (.-path other))
  1445. (= checksum (.-checksum other))))
  1446. IHash
  1447. (-hash [_]
  1448. (hash {:dir dir
  1449. :type type
  1450. :path path
  1451. :checksum checksum}))
  1452. ILookup
  1453. (-lookup [o k] (-lookup o k nil))
  1454. (-lookup [_ k not-found]
  1455. (case k
  1456. :type type
  1457. :dir dir
  1458. :path path
  1459. :stat stat
  1460. :checksum checksum
  1461. not-found))
  1462. IPrintWithWriter
  1463. (-pr-writer [_ w _opts]
  1464. (write-all w (str {:type type :base-path dir :path path :size (:size stat) :checksum checksum}))))
  1465. (defn- <file-change-event=>recent-remote->local-file-item
  1466. [graph-uuid ^FileChangeEvent e]
  1467. (go
  1468. (let [tp (case (.-type e)
  1469. ("add" "change") :update
  1470. "unlink" :delete)
  1471. path (relative-path e)]
  1472. {:remote->local-type tp
  1473. :checksum (if (= tp :delete) nil
  1474. (val (first (<! (get-local-files-checksum graph-uuid (.-dir e) [path])))))
  1475. :path path})))
  1476. (defn- distinct-file-change-events-xf
  1477. "transducer.
  1478. distinct `FileChangeEvent`s by their path, keep the first one."
  1479. [rf]
  1480. (let [seen (volatile! #{})]
  1481. (fn
  1482. ([] (rf))
  1483. ([result] (rf result))
  1484. ([result ^FileChangeEvent e]
  1485. (if (contains? @seen (.-path e))
  1486. result
  1487. (do (vswap! seen conj (.-path e))
  1488. (rf result e)))))))
  1489. (defn- distinct-file-change-events
  1490. "distinct `FileChangeEvent`s by their path, keep the last one."
  1491. [es]
  1492. (transduce distinct-file-change-events-xf conj '() (reverse es)))
  1493. (defn- partition-file-change-events
  1494. "return transducer.
  1495. partition `FileChangeEvent`s, at most N file-change-events in each partition.
  1496. only one type in a partition."
  1497. [n]
  1498. (comp
  1499. (partition-by (fn [^FileChangeEvent e]
  1500. (case (.-type e)
  1501. ("add" "change") :add-or-change
  1502. "unlink" :unlink)))
  1503. (map #(partition-all n %))
  1504. cat))
  1505. (declare sync-state--valid-to-accept-filewatcher-event?)
  1506. (defonce local-changes-chan (chan (async/dropping-buffer 1000)))
  1507. (defn file-watch-handler
  1508. "file-watcher callback"
  1509. [type {:keys [dir path _content stat] :as _payload}]
  1510. (when-let [current-graph (state/get-current-repo)]
  1511. (when (string/ends-with? current-graph dir)
  1512. (when-let [sync-state (state/get-file-sync-state (state/get-current-file-sync-graph-uuid))]
  1513. (when (sync-state--valid-to-accept-filewatcher-event? sync-state)
  1514. (when (or (:mtime stat) (= type "unlink"))
  1515. (go
  1516. (let [path (remove-dir-prefix dir path)
  1517. files-meta (and (not= "unlink" type)
  1518. (<! (<get-local-files-meta
  1519. rsapi (:current-syncing-graph-uuid sync-state) dir [path])))
  1520. checksum (and (coll? files-meta) (some-> files-meta first :etag))]
  1521. (>! local-changes-chan (->FileChangeEvent type dir path stat checksum))))))))))
  1522. (defn local-changes-revised-chan-builder
  1523. "return chan"
  1524. [local-changes-chan rename-page-event-chan]
  1525. (let [*rename-events (atom #{})
  1526. ch (chan 1000)]
  1527. (go-loop []
  1528. (let [{:keys [rename-event local-change]}
  1529. (async/alt!
  1530. rename-page-event-chan ([v] {:rename-event v}) ;; {:repo X :old-path X :new-path}
  1531. local-changes-chan ([v] {:local-change v}))]
  1532. (cond
  1533. rename-event
  1534. (let [repo-dir (config/get-repo-dir (:repo rename-event))
  1535. remove-dir-prefix-fn #(remove-dir-prefix repo-dir %)
  1536. rename-event* (-> rename-event
  1537. (update :old-path remove-dir-prefix-fn)
  1538. (update :new-path remove-dir-prefix-fn))
  1539. k1 [:old-path (:old-path rename-event*) repo-dir]
  1540. k2 [:new-path (:new-path rename-event*) repo-dir]]
  1541. (swap! *rename-events conj k1 k2)
  1542. ;; remove rename-events after 2s
  1543. (go (<! (timeout 3000))
  1544. (swap! *rename-events disj k1 k2))
  1545. ;; add 2 simulated file-watcher events
  1546. (>! ch (->FileChangeEvent "unlink" repo-dir (:old-path rename-event*) nil nil))
  1547. (>! ch (->FileChangeEvent "add" repo-dir (:new-path rename-event*)
  1548. {:mtime (tc/to-long (t/now))} "fake-checksum"))
  1549. (recur))
  1550. local-change
  1551. (cond
  1552. (and (= "change" (.-type local-change))
  1553. (or (contains? @*rename-events [:old-path (.-path local-change) (.-dir local-change)])
  1554. (contains? @*rename-events [:new-path (.-path local-change) (.-dir local-change)])))
  1555. (do (println :debug "ignore" local-change)
  1556. ;; ignore
  1557. (recur))
  1558. (and (= "add" (.-type local-change))
  1559. (contains? @*rename-events [:new-path (.-path local-change) (.-dir local-change)]))
  1560. ;; ignore
  1561. (do (println :debug "ignore" local-change)
  1562. (recur))
  1563. (and (= "unlink" (.-type local-change))
  1564. (contains? @*rename-events [:old-path (.-path local-change) (.-dir local-change)]))
  1565. (do (println :debug "ignore" local-change)
  1566. (recur))
  1567. :else
  1568. (do (>! ch local-change)
  1569. (recur))))))
  1570. ch))
  1571. (defonce local-changes-revised-chan
  1572. (local-changes-revised-chan-builder local-changes-chan (state/get-file-rename-event-chan)))
  1573. ;;; ### encryption
  1574. (def pwd-map
  1575. "graph-uuid->{:pwd xxx :public-key xxx :private-key xxx}"
  1576. (atom {}))
  1577. (defonce *pwd-map-changed-chan
  1578. (atom {}))
  1579. (defn- get-graph-pwd-changed-chan
  1580. [graph-uuid]
  1581. (if-let [result (get @*pwd-map-changed-chan graph-uuid)]
  1582. result
  1583. (let [c (chan (async/sliding-buffer 1))]
  1584. (swap! *pwd-map-changed-chan assoc graph-uuid c)
  1585. c)))
  1586. (defn- <encrypt-content
  1587. [content key*]
  1588. (p->c (encrypt/encrypt-with-passphrase key* content)))
  1589. (defn- decrypt-content
  1590. [encrypted-content key*]
  1591. (go
  1592. (let [r (<! (p->c (encrypt/decrypt-with-passphrase key* encrypted-content)))]
  1593. (when-not (instance? ExceptionInfo r) r))))
  1594. (defn- local-storage-pwd-path
  1595. [graph-uuid]
  1596. (str "encrypted-pwd/" graph-uuid))
  1597. (defn- persist-pwd!
  1598. [pwd graph-uuid]
  1599. {:pre [(string? pwd)]}
  1600. (js/localStorage.setItem (local-storage-pwd-path graph-uuid) pwd))
  1601. (defn- remove-pwd!
  1602. [graph-uuid]
  1603. (js/localStorage.removeItem (local-storage-pwd-path graph-uuid)))
  1604. (defn get-pwd
  1605. [graph-uuid]
  1606. (js/localStorage.getItem (local-storage-pwd-path graph-uuid)))
  1607. (defn remove-all-pwd!
  1608. []
  1609. (doseq [k (filter #(string/starts-with? % "encrypted-pwd/") (js->clj (js-keys js/localStorage)))]
  1610. (js/localStorage.removeItem k))
  1611. (reset! pwd-map {}))
  1612. (defn encrypt+persist-pwd!
  1613. "- persist encrypted pwd at local-storage"
  1614. [pwd graph-uuid]
  1615. (go
  1616. (let [[value expired-at gone?]
  1617. ((juxt :value :expired-at #(-> % ex-data :err :status (= 410)))
  1618. (<! (<get-graph-salt-memoize remoteapi graph-uuid)))
  1619. [salt-value _expired-at]
  1620. (if gone?
  1621. (let [r (<! (<create-graph-salt remoteapi graph-uuid))]
  1622. (update-graph-salt-cache graph-uuid r)
  1623. ((juxt :value :expired-at) r))
  1624. [value expired-at])
  1625. encrypted-pwd (<! (<encrypt-content pwd salt-value))]
  1626. (persist-pwd! encrypted-pwd graph-uuid))))
  1627. (defn restore-pwd!
  1628. "restore pwd from persisted encrypted-pwd, update `pwd-map`"
  1629. [graph-uuid]
  1630. (go
  1631. (let [encrypted-pwd (get-pwd graph-uuid)]
  1632. (if (nil? encrypted-pwd)
  1633. {:restore-pwd-failed true}
  1634. (let [[salt-value _expired-at gone?]
  1635. ((juxt :value :expired-at #(-> % ex-data :err :status (= 410)))
  1636. (<! (<get-graph-salt-memoize remoteapi graph-uuid)))]
  1637. (if (or gone? (empty? salt-value))
  1638. {:restore-pwd-failed "expired salt"}
  1639. (let [pwd (<! (decrypt-content encrypted-pwd salt-value))]
  1640. (if (nil? pwd)
  1641. {:restore-pwd-failed (str "decrypt-pwd failed, salt: " salt-value)}
  1642. (swap! pwd-map assoc-in [graph-uuid :pwd] pwd)))))))))
  1643. (defn- set-keys&notify
  1644. [graph-uuid public-key private-key]
  1645. (swap! pwd-map assoc-in [graph-uuid :public-key] public-key)
  1646. (swap! pwd-map assoc-in [graph-uuid :private-key] private-key)
  1647. (offer! (get-graph-pwd-changed-chan graph-uuid) true))
  1648. (defn- <set-graph-encryption-keys!
  1649. [graph-uuid pwd public-key encrypted-private-key]
  1650. (go
  1651. (let [private-key (when (and pwd encrypted-private-key)
  1652. (<! (decrypt-content encrypted-private-key pwd)))]
  1653. (when (and private-key (string/starts-with? private-key "AGE-SECRET-KEY"))
  1654. (set-keys&notify graph-uuid public-key private-key)))))
  1655. (def <restored-pwd (chan (async/sliding-buffer 1)))
  1656. (def <restored-pwd-pub (async/pub <restored-pwd :graph-uuid))
  1657. (defn- <ensure-pwd-exists!
  1658. "return password or nil when restore pwd from localstorage failed"
  1659. [repo graph-uuid init-graph-keys]
  1660. (go
  1661. (let [{:keys [restore-pwd-failed]} (<! (restore-pwd! graph-uuid))
  1662. pwd (get-in @pwd-map [graph-uuid :pwd])]
  1663. (if restore-pwd-failed
  1664. (do (state/pub-event! [:modal/remote-encryption-input-pw-dialog repo
  1665. (state/get-remote-graph-info-by-uuid graph-uuid)
  1666. :input-pwd-remote
  1667. {:GraphUUID graph-uuid
  1668. :init-graph-keys init-graph-keys
  1669. :after-input-password (fn [pwd]
  1670. (when pwd
  1671. (swap! pwd-map assoc-in [graph-uuid :pwd] pwd)
  1672. (offer! <restored-pwd {:graph-uuid graph-uuid :value true})))}])
  1673. nil)
  1674. pwd))))
  1675. (defn clear-pwd!
  1676. "- clear pwd in `pwd-map`
  1677. - remove encrypted-pwd in local-storage"
  1678. [graph-uuid]
  1679. (swap! pwd-map dissoc graph-uuid)
  1680. (remove-pwd! graph-uuid))
  1681. (defn- <loop-ensure-pwd&keys
  1682. [graph-uuid repo *stopped?]
  1683. (let [<restored-pwd-sub-chan (chan 1)]
  1684. (async/sub <restored-pwd-pub graph-uuid <restored-pwd-sub-chan)
  1685. (go-loop []
  1686. (if @*stopped?
  1687. ::stop
  1688. (let [{:keys [public-key encrypted-private-key] :as r}
  1689. (<! (<get-graph-encrypt-keys-memoize remoteapi graph-uuid))
  1690. init-graph-keys (some-> (ex-data r) :err :status (= 404))
  1691. pwd (<! (<ensure-pwd-exists! repo graph-uuid init-graph-keys))]
  1692. (cond
  1693. (not pwd)
  1694. (do (println :debug "waiting password...")
  1695. (<! <restored-pwd-sub-chan) ;loop to wait password
  1696. (println :debug "waiting password...DONE" graph-uuid)
  1697. (recur))
  1698. init-graph-keys
  1699. ;; when public+private keys not stored at server
  1700. ;; generate a new key pair and upload them
  1701. (let [next-state
  1702. (let [{public-key :publicKey private-key :secretKey}
  1703. (<! (<key-gen rsapi))
  1704. encrypted-private-key (<! (<encrypt-content private-key pwd))
  1705. _ (assert (string? encrypted-private-key)
  1706. {:encrypted-private-key encrypted-private-key
  1707. :private-key private-key
  1708. :pwd pwd})
  1709. upload-r (<! (<upload-graph-encrypt-keys remoteapi graph-uuid public-key encrypted-private-key))]
  1710. (if (instance? ExceptionInfo upload-r)
  1711. (do (js/console.log "upload-graph-encrypt-keys err" upload-r)
  1712. ::stop)
  1713. (do (update-graph-encrypt-keys-cache graph-uuid {:public-key public-key
  1714. :encrypted-private-key encrypted-private-key})
  1715. :recur)))]
  1716. (if (= :recur next-state)
  1717. (recur)
  1718. next-state))
  1719. :else
  1720. ;; pwd, public-key, encrypted-private-key all exist
  1721. (do (assert (and pwd public-key encrypted-private-key) {:encrypted-private-key encrypted-private-key
  1722. :public-key public-key
  1723. :pwd pwd})
  1724. (<! (<set-graph-encryption-keys! graph-uuid pwd public-key encrypted-private-key))
  1725. (if (get-in @pwd-map [graph-uuid :private-key])
  1726. (do (when (state/modal-opened?)
  1727. (state/set-state! [:ui/loading? :set-graph-password] false)
  1728. (state/close-modal!))
  1729. ::idle)
  1730. ;; bad pwd
  1731. (do (when (state/modal-opened?)
  1732. (when (state/sub [:ui/loading? :set-graph-password])
  1733. (state/set-state! [:file-sync/set-remote-graph-password-result]
  1734. {:fail "Incorrect password. Please try again"}))
  1735. (state/set-state! [:ui/loading? :set-graph-password] false))
  1736. (clear-pwd! graph-uuid)
  1737. (recur))))))))))
  1738. (defn- <set-env&keys
  1739. [prod? graph-uuid]
  1740. (let [{:keys [private-key public-key]} (get @pwd-map graph-uuid)]
  1741. (assert (and private-key public-key) (pr-str :private-key private-key :public-key public-key
  1742. :pwd-map @pwd-map))
  1743. (<set-env rsapi graph-uuid prod? private-key public-key)))
  1744. (defn- <ensure-set-env&keys
  1745. [graph-uuid *stopped?]
  1746. (go-loop []
  1747. (let [{:keys [change timeout]}
  1748. (async/alt! (get-graph-pwd-changed-chan graph-uuid) {:change true}
  1749. (timeout 10000) {:timeout true})]
  1750. (cond
  1751. @*stopped? nil
  1752. change (<! (<set-env&keys config/FILE-SYNC-PROD? graph-uuid))
  1753. timeout (recur)))))
  1754. ;;; ### chans to control sync process
  1755. (def full-sync-chan
  1756. "offer `true` to this chan will trigger a local->remote full sync"
  1757. (chan 1))
  1758. (def full-sync-mult (async/mult full-sync-chan))
  1759. (def stop-sync-chan
  1760. "offer `true` to this chan will stop current `SyncManager`"
  1761. (chan 1))
  1762. (def stop-sync-mult (async/mult stop-sync-chan))
  1763. (def remote->local-sync-chan
  1764. "offer `true` to this chan will trigger a remote->local sync"
  1765. (chan 1))
  1766. (def remote->local-sync-mult (async/mult remote->local-sync-chan))
  1767. (def remote->local-full-sync-chan
  1768. "offer `true` to this chan will trigger a remote->local full sync"
  1769. (chan 1))
  1770. (def remote->local-full-sync-mult (async/mult remote->local-full-sync-chan))
  1771. (def immediately-local->remote-chan
  1772. "Immediately trigger upload of files in waiting queue"
  1773. (chan))
  1774. (def immediately-local->remote-mult (async/mult immediately-local->remote-chan))
  1775. (def pause-resume-chan
  1776. "false -> pause, true -> resume.
  1777. see also `*resume-state`"
  1778. (chan 1))
  1779. (def pause-resume-mult (async/mult pause-resume-chan))
  1780. (def recent-edited-chan
  1781. "Triggered when there is content editing"
  1782. (chan 1))
  1783. (def recent-edited-mult (async/mult recent-edited-chan))
  1784. (def last-input-time-cursor (rum/cursor state/state :editor/last-input-time))
  1785. (add-watch last-input-time-cursor "sync"
  1786. (fn [_ _ _ _]
  1787. (offer! recent-edited-chan true)))
  1788. ;;; ### sync state
  1789. (def *resume-state
  1790. "key: graph-uuid"
  1791. (atom {}))
  1792. (defn resume-state--add-remote->local-state
  1793. [graph-uuid]
  1794. (swap! *resume-state assoc graph-uuid {:remote->local true}))
  1795. (defn resume-state--add-remote->local-full-sync-state
  1796. [graph-uuid]
  1797. (swap! *resume-state assoc graph-uuid {:remote->local-full-sync true}))
  1798. (defn resume-state--add-local->remote-state
  1799. [graph-uuid local-changes]
  1800. (swap! *resume-state assoc graph-uuid {:local->remote local-changes}))
  1801. ;; (defn resume-state--add-local->remote-full-sync-state
  1802. ;; [graph-uuid]
  1803. ;; (swap! *resume-state assoc graph-uuid {:local->remote-full-sync true}))
  1804. (defn resume-state--reset
  1805. [graph-uuid]
  1806. (swap! *resume-state dissoc graph-uuid))
  1807. (defn sync-state
  1808. "create a new sync-state"
  1809. []
  1810. {:post [(s/valid? ::sync-state %)]}
  1811. {:current-syncing-graph-uuid nil
  1812. :state ::starting
  1813. :full-local->remote-files #{}
  1814. :current-local->remote-files #{}
  1815. :full-remote->local-files #{}
  1816. :current-remote->local-files #{}
  1817. :queued-local->remote-files #{}
  1818. :recent-remote->local-files #{}
  1819. :history '()})
  1820. (defn- sync-state--update-current-syncing-graph-uuid
  1821. [sync-state graph-uuid]
  1822. {:pre [(s/valid? ::sync-state sync-state)]
  1823. :post [(s/valid? ::sync-state %)]}
  1824. (assoc sync-state :current-syncing-graph-uuid graph-uuid))
  1825. (defn- sync-state--update-state
  1826. [sync-state next-state]
  1827. {:pre [(s/valid? ::state next-state)]
  1828. :post [(s/valid? ::sync-state %)]}
  1829. (assoc sync-state :state next-state))
  1830. (defn sync-state--add-current-remote->local-files
  1831. [sync-state paths]
  1832. {:post [(s/valid? ::sync-state %)]}
  1833. (update sync-state :current-remote->local-files into paths))
  1834. (defn sync-state--add-current-local->remote-files
  1835. [sync-state paths]
  1836. {:post [(s/valid? ::sync-state %)]}
  1837. (update sync-state :current-local->remote-files into paths))
  1838. (defn sync-state--add-queued-local->remote-files
  1839. [sync-state event]
  1840. {:post [(s/valid? ::sync-state %)]}
  1841. (update sync-state :queued-local->remote-files
  1842. (fn [o event]
  1843. (->> (concat o [event])
  1844. (util/distinct-by-last-wins (fn [e] (.-path e))))) event))
  1845. (defn sync-state--remove-queued-local->remote-files
  1846. [sync-state event]
  1847. {:post [(s/valid? ::sync-state %)]}
  1848. (update sync-state :queued-local->remote-files
  1849. (fn [o event]
  1850. (remove #{event} o)) event))
  1851. (defn sync-state-reset-queued-local->remote-files
  1852. [sync-state]
  1853. {:post [(s/valid? ::sync-state %)]}
  1854. (assoc sync-state :queued-local->remote-files nil))
  1855. (defn sync-state--add-recent-remote->local-files
  1856. [sync-state items]
  1857. {:pre [(s/valid? (s/coll-of ::recent-remote->local-file-item) items)]
  1858. :post [(s/valid? ::sync-state %)]}
  1859. (update sync-state :recent-remote->local-files (partial apply conj) items))
  1860. (defn sync-state--remove-recent-remote->local-files
  1861. [sync-state items]
  1862. {:post [(s/valid? ::sync-state %)]}
  1863. (update sync-state :recent-remote->local-files set/difference items))
  1864. (defn sync-state-reset-full-local->remote-files
  1865. [sync-state events]
  1866. {:post [(s/valid? ::sync-state %)]}
  1867. (assoc sync-state :full-local->remote-files events))
  1868. (defn sync-state-reset-full-remote->local-files
  1869. [sync-state events]
  1870. {:post [(s/valid? ::sync-state %)]}
  1871. (assoc sync-state :full-remote->local-files events))
  1872. (defn- add-history-items
  1873. [history paths now]
  1874. (sequence
  1875. (comp
  1876. ;; only reserve the latest one of same-path-items
  1877. (dedupe-by :path)
  1878. ;; reserve the latest 20 history items
  1879. (take 20))
  1880. (into (filter (fn [o]
  1881. (not (contains? (set paths) (:path o)))) history)
  1882. (map (fn [path] {:path path :time now}) paths))))
  1883. (defn sync-state--remove-current-remote->local-files
  1884. [sync-state paths add-history?]
  1885. {:post [(s/valid? ::sync-state %)]}
  1886. (let [now (t/now)]
  1887. (cond-> sync-state
  1888. true (update :current-remote->local-files set/difference paths)
  1889. add-history? (update :history add-history-items paths now))))
  1890. (defn sync-state--remove-current-local->remote-files
  1891. [sync-state paths add-history?]
  1892. {:post [(s/valid? ::sync-state %)]}
  1893. (let [now (t/now)]
  1894. (cond-> sync-state
  1895. true (update :current-local->remote-files set/difference paths)
  1896. add-history? (update :history add-history-items paths now))))
  1897. (defn sync-state--stopped?
  1898. "Graph syncing is stopped"
  1899. [sync-state]
  1900. {:pre [(s/valid? ::sync-state sync-state)]}
  1901. (= ::stop (:state sync-state)))
  1902. (defn sync-state--valid-to-accept-filewatcher-event?
  1903. [sync-state]
  1904. {:pre [(s/valid? ::sync-state sync-state)]}
  1905. (contains? #{::idle ::local->remote ::remote->local ::local->remote-full-sync ::remote->local-full-sync}
  1906. (:state sync-state)))
  1907. ;;; ### remote->local syncer & local->remote syncer
  1908. (defprotocol IRemote->LocalSync
  1909. (stop-remote->local! [this])
  1910. (<sync-remote->local! [this] "return ExceptionInfo when error occurs")
  1911. (<sync-remote->local-all-files! [this] "sync all files, return ExceptionInfo when error occurs"))
  1912. (defprotocol ILocal->RemoteSync
  1913. (setup-local->remote! [this])
  1914. (stop-local->remote! [this])
  1915. (<ratelimit [this from-chan] "get watched local file-change events from FROM-CHAN,
  1916. return chan returning events with rate limited")
  1917. (<sync-local->remote! [this es] "es is a sequence of `FileChangeEvent`, all items have same type.")
  1918. (<sync-local->remote-all-files! [this] "compare all local files to remote ones, sync when not equal.
  1919. if local-txid != remote-txid, return {:need-sync-remote true}"))
  1920. (defrecord ^:large-vars/cleanup-todo
  1921. Remote->LocalSyncer [user-uuid graph-uuid base-path repo *txid *sync-state remoteapi
  1922. ^:mutable local->remote-syncer *stopped *paused]
  1923. Object
  1924. (set-local->remote-syncer! [_ s] (set! local->remote-syncer s))
  1925. (sync-files-remote->local!
  1926. [_ relative-filepath+checksum-coll latest-txid]
  1927. (go
  1928. (let [partitioned-filetxns
  1929. (sequence (filepath+checksum-coll->partitioned-filetxns
  1930. download-batch-size graph-uuid user-uuid)
  1931. relative-filepath+checksum-coll)
  1932. r
  1933. (if (empty? (flatten partitioned-filetxns))
  1934. {:succ true}
  1935. (do
  1936. (put-sync-event! {:event :start
  1937. :data {:type :full-remote->local
  1938. :graph-uuid graph-uuid
  1939. :full-sync? true
  1940. :epoch (tc/to-epoch (t/now))}})
  1941. (<! (apply-filetxns-partitions
  1942. *sync-state user-uuid graph-uuid base-path partitioned-filetxns repo
  1943. nil *stopped *paused true))))]
  1944. (cond
  1945. (instance? ExceptionInfo r) {:unknown r}
  1946. @*stopped {:stop true}
  1947. @*paused {:pause true}
  1948. :else
  1949. (do
  1950. (swap! *sync-state #(sync-state-reset-full-remote->local-files % []))
  1951. (<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo))
  1952. (reset! *txid latest-txid)
  1953. {:succ true})))))
  1954. IRemote->LocalSync
  1955. (stop-remote->local! [_] (vreset! *stopped true))
  1956. (<sync-remote->local! [_]
  1957. (go
  1958. (let [r
  1959. (let [diff-r (<! (<get-diff remoteapi graph-uuid @*txid))]
  1960. (if (instance? ExceptionInfo diff-r)
  1961. diff-r
  1962. (let [[diff-txns latest-txid min-txid] diff-r]
  1963. (if (> (dec min-txid) @*txid) ;; min-txid-1 > @*txid, need to remote->local-full-sync
  1964. (do (println "min-txid" min-txid "request-txid" @*txid)
  1965. {:need-remote->local-full-sync true})
  1966. (when (pos-int? latest-txid)
  1967. (let [filtered-diff-txns (-> (transduce (diffs->filetxns) conj '() (reverse diff-txns))
  1968. filter-download-files-with-reserved-chars)
  1969. partitioned-filetxns (transduce (partition-filetxns download-batch-size)
  1970. (completing (fn [r i] (conj r (reverse i)))) ;reverse
  1971. '()
  1972. filtered-diff-txns)]
  1973. (put-sync-event! {:event :start
  1974. :data {:type :remote->local
  1975. :graph-uuid graph-uuid
  1976. :full-sync? false
  1977. :epoch (tc/to-epoch (t/now))}})
  1978. (if (empty? (flatten partitioned-filetxns))
  1979. (do
  1980. (swap! *sync-state #(sync-state-reset-full-remote->local-files % []))
  1981. (<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo))
  1982. (reset! *txid latest-txid)
  1983. {:succ true})
  1984. (<! (apply-filetxns-partitions
  1985. *sync-state user-uuid graph-uuid base-path
  1986. partitioned-filetxns repo *txid *stopped *paused false)))))))))]
  1987. (cond
  1988. (instance? ExceptionInfo r) {:unknown r}
  1989. @*stopped {:stop true}
  1990. @*paused {:pause true}
  1991. (:need-remote->local-full-sync r) r
  1992. :else {:succ true}))))
  1993. (<sync-remote->local-all-files! [this]
  1994. (go
  1995. (let [remote-all-files-meta-c (<get-remote-all-files-meta remoteapi graph-uuid)
  1996. local-all-files-meta-c (<get-local-all-files-meta rsapi graph-uuid base-path)
  1997. remote-all-files-meta-or-exp (<! remote-all-files-meta-c)]
  1998. (if (or (storage-exceed-limit? remote-all-files-meta-or-exp)
  1999. (sync-stop-when-api-flying? remote-all-files-meta-or-exp)
  2000. (decrypt-exp? remote-all-files-meta-or-exp))
  2001. (do (put-sync-event! {:event :exception-decrypt-failed
  2002. :data {:graph-uuid graph-uuid
  2003. :exp remote-all-files-meta-or-exp
  2004. :epoch (tc/to-epoch (t/now))}})
  2005. {:stop true})
  2006. (let [remote-all-files-meta remote-all-files-meta-or-exp
  2007. local-all-files-meta (<! local-all-files-meta-c)
  2008. diff-remote-files (diff-file-metadata-sets remote-all-files-meta local-all-files-meta)
  2009. recent-10-days-range ((juxt #(tc/to-long (t/minus % (t/days 10))) #(tc/to-long %)) (t/today))
  2010. sorted-diff-remote-files
  2011. (sort-by
  2012. (sort-file-metadata-fn :recent-days-range recent-10-days-range) > diff-remote-files)
  2013. remote-graph-info-or-ex (<! (<get-remote-graph remoteapi nil graph-uuid))
  2014. latest-txid (:TXId remote-graph-info-or-ex)]
  2015. (if (or (instance? ExceptionInfo remote-graph-info-or-ex) (nil? latest-txid))
  2016. (do (put-sync-event! {:event :get-remote-graph-failed
  2017. :data {:graph-uuid graph-uuid
  2018. :exp remote-graph-info-or-ex
  2019. :epoch (tc/to-epoch (t/now))}})
  2020. {:stop true})
  2021. (do (println "[full-sync(remote->local)]" (count sorted-diff-remote-files) "files need to sync")
  2022. (let [filtered-files (filter-download-files-with-reserved-chars sorted-diff-remote-files)]
  2023. (swap! *sync-state #(sync-state-reset-full-remote->local-files % sorted-diff-remote-files))
  2024. (<! (.sync-files-remote->local!
  2025. this (map (juxt relative-path -checksum)
  2026. filtered-files)
  2027. latest-txid)))))))))))
  2028. (defn- <file-changed?
  2029. "return true when file changed compared with remote"
  2030. [graph-uuid file-path-without-base-path base-path]
  2031. {:pre [(string? file-path-without-base-path)]}
  2032. (go
  2033. (let [remote-meta-or-exp (<! (<get-remote-files-meta remoteapi graph-uuid [file-path-without-base-path]))
  2034. local-meta (first (<! (<get-local-files-meta rsapi graph-uuid base-path [file-path-without-base-path])))]
  2035. (if (instance? ExceptionInfo remote-meta-or-exp)
  2036. false
  2037. (not= (first remote-meta-or-exp) local-meta)))))
  2038. (defn- <filter-local-changes-pred
  2039. "filter local-change events:
  2040. - for 'unlink' event
  2041. - when related file exists on local dir, ignore this event
  2042. - for 'add' | 'change' event
  2043. - when related file's content is same as remote file, ignore it"
  2044. [^FileChangeEvent e basepath graph-uuid]
  2045. (go
  2046. (let [r-path (relative-path e)]
  2047. (case (.-type e)
  2048. "unlink"
  2049. ;; keep this e when it's not found
  2050. (<! (<local-file-not-exist? graph-uuid rsapi basepath r-path))
  2051. ("add" "change")
  2052. ;; 1. local file exists
  2053. ;; 2. compare with remote file, and changed
  2054. (and (not (<! (<local-file-not-exist? graph-uuid rsapi basepath r-path)))
  2055. (<! (<file-changed? graph-uuid r-path basepath)))))))
  2056. (defn- <filter-checksum-not-consistent
  2057. "filter out FileChangeEvents checksum changed,
  2058. compare checksum in FileChangeEvent and checksum calculated now"
  2059. [graph-uuid es]
  2060. {:pre [(or (nil? es) (coll? es))
  2061. (every? #(instance? FileChangeEvent %) es)]}
  2062. (go
  2063. (when (seq es)
  2064. (if (= "unlink" (.-type ^FileChangeEvent (first es)))
  2065. es
  2066. (let [base-path (.-dir (first es))
  2067. files-meta (<! (<get-local-files-meta
  2068. rsapi graph-uuid base-path (mapv relative-path es)))
  2069. current-checksum-map (when (coll? files-meta) (into {} (mapv (juxt :path :etag) files-meta)))
  2070. origin-checksum-map (into {} (mapv (juxt relative-path #(.-checksum ^FileChangeEvent %)) es))
  2071. origin-map (into {} (mapv (juxt relative-path identity) es))]
  2072. (->>
  2073. (merge-with
  2074. #(boolean (or (nil? %1) (= "fake-checksum" %1) (= %1 %2)))
  2075. origin-checksum-map current-checksum-map)
  2076. (filterv (comp true? second))
  2077. (mapv first)
  2078. (select-keys origin-map)
  2079. vals))))))
  2080. (def ^:private file-size-limit (* 100 1000 1024)) ;100MB
  2081. (defn- filter-too-huge-files-aux
  2082. [e]
  2083. {:post [(boolean? %)]}
  2084. (if (= "unlink" (.-type ^FileChangeEvent e))
  2085. true
  2086. (boolean
  2087. (when-some [size (:size (.-stat e))]
  2088. (< size file-size-limit)))))
  2089. (defn- filter-too-huge-files
  2090. "filter out files > `file-size-limit`"
  2091. [es]
  2092. {:pre [(or (nil? es) (coll? es))
  2093. (every? #(instance? FileChangeEvent %) es)]}
  2094. (filterv filter-too-huge-files-aux es))
  2095. (defn- filter-local-files-in-deletion-logs
  2096. [local-all-files-meta deletion-logs]
  2097. (let [deletion-logs-map (into {} (map (juxt :path identity)) deletion-logs)
  2098. *keep (transient #{})
  2099. *delete (transient #{})]
  2100. (doseq [f local-all-files-meta]
  2101. (let [epoch-long (some-> (get deletion-logs-map (:path f))
  2102. :epoch
  2103. (* 1000))]
  2104. (if (and epoch-long (> epoch-long (:last-modified f)))
  2105. (conj! *delete f)
  2106. (conj! *keep f))))
  2107. {:keep (persistent! *keep)
  2108. :delete (persistent! *delete)}))
  2109. (defn- <filter-too-long-filename
  2110. [graph-uuid local-files-meta]
  2111. (go (let [origin-fnames (mapv :path local-files-meta)
  2112. encrypted-fnames (<! (<encrypt-fnames rsapi graph-uuid origin-fnames))
  2113. fnames-map (zipmap origin-fnames encrypted-fnames)
  2114. local-files-meta-map (into {} (map (fn [meta] [(:path meta) meta])) local-files-meta)]
  2115. (sequence
  2116. (comp
  2117. (filter
  2118. (fn [[path _]]
  2119. ; 950 = (- 1024 36 36 2)
  2120. ; 1024 - length of 'user-uuid/graph-uuid/'
  2121. (<= (count (get fnames-map path)) 950)))
  2122. (map second))
  2123. local-files-meta-map))))
  2124. (defrecord ^:large-vars/cleanup-todo
  2125. Local->RemoteSyncer [user-uuid graph-uuid base-path repo *sync-state remoteapi
  2126. ^:mutable rate *txid ^:mutable remote->local-syncer stop-chan *stopped *paused
  2127. ;; control chans
  2128. private-immediately-local->remote-chan private-recent-edited-chan]
  2129. Object
  2130. (filter-file-change-events-fn [_]
  2131. (fn [^FileChangeEvent e]
  2132. (go (and (instance? FileChangeEvent e)
  2133. (if-let [mtime (:mtime (.-stat e))]
  2134. ;; if mtime is not nil, it should be after (- now 1min)
  2135. ;; ignore events too early
  2136. (> (* 1000 mtime) (tc/to-long (t/minus (t/now) (t/minutes 1))))
  2137. true)
  2138. (or (string/starts-with? (.-dir e) base-path)
  2139. (string/starts-with? (str "file://" (.-dir e)) base-path)) ; valid path prefix
  2140. (not (ignored? e)) ;not ignored
  2141. ;; download files will also trigger file-change-events, ignore them
  2142. (not (contains? (:recent-remote->local-files @*sync-state)
  2143. (<! (<file-change-event=>recent-remote->local-file-item
  2144. graph-uuid e))))))))
  2145. (set-remote->local-syncer! [_ s] (set! remote->local-syncer s))
  2146. ILocal->RemoteSync
  2147. (setup-local->remote! [_]
  2148. (async/tap immediately-local->remote-mult private-immediately-local->remote-chan)
  2149. (async/tap recent-edited-mult private-recent-edited-chan))
  2150. (stop-local->remote! [_]
  2151. (async/untap immediately-local->remote-mult private-immediately-local->remote-chan)
  2152. (async/untap recent-edited-mult private-recent-edited-chan)
  2153. (async/close! stop-chan)
  2154. (vreset! *stopped true))
  2155. (<ratelimit [this from-chan]
  2156. (let [<fast-filter-e-fn (.filter-file-change-events-fn this)]
  2157. (util/<ratelimit
  2158. from-chan rate
  2159. :filter-fn
  2160. (fn [e]
  2161. (go
  2162. (and (rsapi-ready? rsapi graph-uuid)
  2163. (<! (<fast-filter-e-fn e))
  2164. (do
  2165. (swap! *sync-state sync-state--add-queued-local->remote-files e)
  2166. (let [v (<! (<filter-local-changes-pred e base-path graph-uuid))]
  2167. (when-not v
  2168. (swap! *sync-state sync-state--remove-queued-local->remote-files e))
  2169. v)))))
  2170. :flush-fn #(swap! *sync-state sync-state-reset-queued-local->remote-files)
  2171. :stop-ch stop-chan
  2172. :distinct-coll? true
  2173. :flush-now-ch private-immediately-local->remote-chan
  2174. :refresh-timeout-ch private-recent-edited-chan)))
  2175. (<sync-local->remote! [_ es]
  2176. (if (empty? es)
  2177. (go {:succ true})
  2178. (let [type (.-type ^FileChangeEvent (first es))
  2179. es->paths-xf (comp
  2180. (map #(relative-path %))
  2181. (remove ignored?))]
  2182. (go
  2183. (let [es* (<! (<filter-checksum-not-consistent graph-uuid es))
  2184. _ (when (not= (count es*) (count es))
  2185. (println :debug :filter-checksum-changed
  2186. (mapv relative-path (set/difference (set es) (set es*)))))
  2187. es** (filter-too-huge-files es*)
  2188. _ (when (not= (count es**) (count es*))
  2189. (println :debug :filter-too-huge-files
  2190. (mapv relative-path (set/difference (set es*) (set es**)))))
  2191. paths (cond-> (sequence es->paths-xf es**)
  2192. (not= type "unlink")
  2193. filter-upload-files-with-reserved-chars)
  2194. _ (println :sync-local->remote type paths)
  2195. r (if (empty? paths)
  2196. (go @*txid)
  2197. (case type
  2198. ("add" "change")
  2199. (<with-pause (<update-remote-files rsapi graph-uuid base-path paths @*txid) *paused)
  2200. "unlink"
  2201. (<with-pause (<delete-remote-files rsapi graph-uuid base-path paths @*txid) *paused)))
  2202. _ (swap! *sync-state sync-state--add-current-local->remote-files paths)
  2203. r* (<! r)
  2204. [succ? paused?] ((juxt number? :pause) r*)
  2205. _ (swap! *sync-state sync-state--remove-current-local->remote-files paths succ?)]
  2206. (cond
  2207. (need-sync-remote? r*)
  2208. (do (println :need-sync-remote r*)
  2209. {:need-sync-remote true})
  2210. (need-reset-local-txid? r*) ;; TODO: this cond shouldn't be true,
  2211. ;; but some potential bugs cause local-txid > remote-txid
  2212. (let [remote-graph-info-or-ex (<! (<get-remote-graph remoteapi nil graph-uuid))
  2213. remote-txid (:TXId remote-graph-info-or-ex)]
  2214. (if (or (instance? ExceptionInfo remote-graph-info-or-ex) (nil? remote-txid))
  2215. (do (put-sync-event! {:event :get-remote-graph-failed
  2216. :data {:graph-uuid graph-uuid
  2217. :exp remote-graph-info-or-ex
  2218. :epoch (tc/to-epoch (t/now))}})
  2219. {:stop true})
  2220. (do (<! (<update-graphs-txid! remote-txid graph-uuid user-uuid repo))
  2221. (reset! *txid remote-txid)
  2222. {:succ true})))
  2223. (graph-has-been-deleted? r*)
  2224. (do (println :graph-has-been-deleted r*)
  2225. {:graph-has-been-deleted true})
  2226. paused?
  2227. {:pause true}
  2228. succ? ; succ
  2229. (do
  2230. (println "sync-local->remote! update txid" r*)
  2231. ;; persist txid
  2232. (<! (<update-graphs-txid! r* graph-uuid user-uuid repo))
  2233. (reset! *txid r*)
  2234. {:succ true})
  2235. :else
  2236. (do
  2237. (println "sync-local->remote unknown:" r*)
  2238. {:unknown r*})))))))
  2239. (<sync-local->remote-all-files! [this]
  2240. (go
  2241. (let [remote-all-files-meta-c (<get-remote-all-files-meta remoteapi graph-uuid)
  2242. local-all-files-meta-c (<get-local-all-files-meta rsapi graph-uuid base-path)
  2243. deletion-logs-c (<get-deletion-logs remoteapi graph-uuid @*txid)
  2244. remote-all-files-meta-or-exp (<! remote-all-files-meta-c)
  2245. deletion-logs-or-exp (<! deletion-logs-c)]
  2246. (cond
  2247. (or (storage-exceed-limit? remote-all-files-meta-or-exp)
  2248. (sync-stop-when-api-flying? remote-all-files-meta-or-exp)
  2249. (decrypt-exp? remote-all-files-meta-or-exp))
  2250. (do (put-sync-event! {:event :get-remote-all-files-failed
  2251. :data {:graph-uuid graph-uuid
  2252. :exp remote-all-files-meta-or-exp
  2253. :epoch (tc/to-epoch (t/now))}})
  2254. {:stop true})
  2255. (instance? ExceptionInfo deletion-logs-or-exp)
  2256. (do (put-sync-event! {:event :get-deletion-logs-failed
  2257. :data {:graph-uuid graph-uuid
  2258. :exp deletion-logs-or-exp
  2259. :epoch (tc/to-epoch (t/now))}})
  2260. {:stop true})
  2261. :else
  2262. (let [remote-all-files-meta remote-all-files-meta-or-exp
  2263. local-all-files-meta (<! local-all-files-meta-c)
  2264. {local-all-files-meta :keep delete-local-files :delete}
  2265. (filter-local-files-in-deletion-logs local-all-files-meta deletion-logs-or-exp)
  2266. recent-10-days-range ((juxt #(tc/to-long (t/minus % (t/days 10))) #(tc/to-long %)) (t/today))
  2267. diff-local-files (->> (diff-file-metadata-sets local-all-files-meta remote-all-files-meta)
  2268. (<filter-too-long-filename graph-uuid)
  2269. <!
  2270. (sort-by (sort-file-metadata-fn :recent-days-range recent-10-days-range) >))
  2271. change-events
  2272. (sequence
  2273. (comp
  2274. ;; convert to FileChangeEvent
  2275. (map #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %)
  2276. {:size (:size %)} (:etag %)))
  2277. (remove ignored?))
  2278. diff-local-files)
  2279. distinct-change-events (-> (distinct-file-change-events change-events)
  2280. filter-upload-files-with-reserved-chars)
  2281. _ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distinct-change-events))
  2282. change-events-partitions
  2283. (sequence
  2284. ;; partition FileChangeEvents
  2285. (partition-file-change-events upload-batch-size)
  2286. distinct-change-events)]
  2287. (println "[full-sync(local->remote)]"
  2288. (count (flatten change-events-partitions)) "files need to sync and"
  2289. (count delete-local-files) "local files need to delete")
  2290. (put-sync-event! {:event :start
  2291. :data {:type :full-local->remote
  2292. :graph-uuid graph-uuid
  2293. :full-sync? true
  2294. :epoch (tc/to-epoch (t/now))}})
  2295. ;; 1. delete local files
  2296. (loop [[f & fs] delete-local-files]
  2297. (when f
  2298. (let [relative-p (relative-path f)]
  2299. (when-not (<! (<local-file-not-exist? graph-uuid rsapi base-path relative-p))
  2300. (let [fake-recent-remote->local-file-item {:remote->local-type :delete
  2301. :checksum nil
  2302. :path relative-p}]
  2303. (swap! *sync-state sync-state--add-recent-remote->local-files
  2304. [fake-recent-remote->local-file-item])
  2305. (<! (<delete-local-files rsapi graph-uuid base-path [(relative-path f)]))
  2306. (go (<! (timeout 5000))
  2307. (swap! *sync-state sync-state--remove-recent-remote->local-files
  2308. [fake-recent-remote->local-file-item])))))
  2309. (recur fs)))
  2310. ;; 2. upload local files
  2311. (loop [es-partitions change-events-partitions]
  2312. (if @*stopped
  2313. {:stop true}
  2314. (if (empty? es-partitions)
  2315. {:succ true}
  2316. (let [{:keys [succ need-sync-remote graph-has-been-deleted unknown stop] :as r}
  2317. (<! (<sync-local->remote! this (first es-partitions)))]
  2318. (s/assert ::sync-local->remote!-result r)
  2319. (cond
  2320. succ
  2321. (recur (next es-partitions))
  2322. (or need-sync-remote graph-has-been-deleted unknown stop) r)))))))))))
  2323. ;;; ### put all stuff together
  2324. (defrecord ^:large-vars/cleanup-todo
  2325. SyncManager [graph-uuid base-path *sync-state
  2326. ^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer remoteapi
  2327. ^:mutable ratelimit-local-changes-chan
  2328. *txid ^:mutable state ^:mutable remote-change-chan ^:mutable *ws *stopped? *paused?
  2329. ^:mutable ops-chan
  2330. ;; control chans
  2331. private-full-sync-chan private-stop-sync-chan private-remote->local-sync-chan
  2332. private-remote->local-full-sync-chan private-pause-resume-chan]
  2333. Object
  2334. (schedule [this next-state args reason]
  2335. {:pre [(s/valid? ::state next-state)]}
  2336. (println "[SyncManager" graph-uuid "]"
  2337. (and state (name state)) "->" (and next-state (name next-state)) :reason reason :now (tc/to-string (t/now)))
  2338. (set! state next-state)
  2339. (swap! *sync-state sync-state--update-state next-state)
  2340. (go
  2341. (case state
  2342. ::need-password
  2343. (<! (.need-password this))
  2344. ::idle
  2345. (<! (.idle this))
  2346. ::local->remote
  2347. (<! (.local->remote this args))
  2348. ::remote->local
  2349. (<! (.remote->local this nil args))
  2350. ::local->remote-full-sync
  2351. (<! (.full-sync this))
  2352. ::remote->local-full-sync
  2353. (<! (.remote->local-full-sync this args))
  2354. ::pause
  2355. (<! (.pause this))
  2356. ::stop
  2357. (-stop! this))))
  2358. (start [this]
  2359. (set! ops-chan (chan (async/dropping-buffer 10)))
  2360. (set! *ws (atom nil))
  2361. (set! remote-change-chan (ws-listen! graph-uuid *ws))
  2362. (set! ratelimit-local-changes-chan (<ratelimit local->remote-syncer local-changes-revised-chan))
  2363. (setup-local->remote! local->remote-syncer)
  2364. (async/tap full-sync-mult private-full-sync-chan)
  2365. (async/tap stop-sync-mult private-stop-sync-chan)
  2366. (async/tap remote->local-sync-mult private-remote->local-sync-chan)
  2367. (async/tap remote->local-full-sync-mult private-remote->local-full-sync-chan)
  2368. (async/tap pause-resume-mult private-pause-resume-chan)
  2369. (go-loop []
  2370. (let [{:keys [stop remote->local remote->local-full-sync local->remote-full-sync local->remote resume pause]}
  2371. (async/alt!
  2372. private-stop-sync-chan {:stop true}
  2373. private-remote->local-full-sync-chan {:remote->local-full-sync true}
  2374. private-remote->local-sync-chan {:remote->local true}
  2375. private-full-sync-chan {:local->remote-full-sync true}
  2376. private-pause-resume-chan ([v] (if v {:resume true} {:pause true}))
  2377. remote-change-chan ([v] (println "remote change:" v) {:remote->local v})
  2378. ratelimit-local-changes-chan ([v]
  2379. (let [rest-v (util/drain-chan ratelimit-local-changes-chan)
  2380. vs (cons v rest-v)]
  2381. (println "local changes:" vs)
  2382. {:local->remote vs}))
  2383. (timeout (* 20 60 1000)) {:local->remote-full-sync true}
  2384. :priority true)]
  2385. (cond
  2386. stop
  2387. (do (util/drain-chan ops-chan)
  2388. (>! ops-chan {:stop true}))
  2389. remote->local-full-sync
  2390. (do (util/drain-chan ops-chan)
  2391. (>! ops-chan {:remote->local-full-sync true})
  2392. (recur))
  2393. remote->local
  2394. (let [txid
  2395. (if (true? remote->local)
  2396. {:txid (:TXId (<! (<get-remote-graph remoteapi nil graph-uuid)))}
  2397. remote->local)]
  2398. (when (some? txid)
  2399. (>! ops-chan {:remote->local txid}))
  2400. (recur))
  2401. local->remote
  2402. (do (>! ops-chan {:local->remote local->remote})
  2403. (recur))
  2404. local->remote-full-sync
  2405. (do (util/drain-chan ops-chan)
  2406. (>! ops-chan {:local->remote-full-sync true})
  2407. (recur))
  2408. resume
  2409. (do (>! ops-chan {:resume true})
  2410. (recur))
  2411. pause
  2412. (do (vreset! *paused? true)
  2413. (>! ops-chan {:pause true})
  2414. (recur)))))
  2415. (.schedule this ::need-password nil nil))
  2416. (need-password
  2417. [this]
  2418. (go
  2419. (let [next-state (<! (<loop-ensure-pwd&keys graph-uuid (state/get-current-repo) *stopped?))]
  2420. (assert (s/valid? ::state next-state) next-state)
  2421. (when (= next-state ::idle)
  2422. (<! (<ensure-set-env&keys graph-uuid *stopped?))
  2423. ;; wait seconds to receive all file change events,
  2424. ;; and then drop all of them.
  2425. ;; WHY: when opening a graph(or switching to another graph),
  2426. ;; file-watcher will send a lot of file-change-events,
  2427. ;; actually, each file corresponds to a file-change-event,
  2428. ;; we need to ignore all of them.
  2429. (<! (timeout 3000))
  2430. (println :drain-local-changes-chan-at-starting
  2431. (count (util/drain-chan local-changes-revised-chan))))
  2432. (if @*stopped?
  2433. (.schedule this ::stop nil nil)
  2434. (.schedule this next-state nil nil)))))
  2435. (pause [this]
  2436. (go (<! (<rsapi-cancel-all-requests)))
  2437. (put-sync-event! {:event :pause
  2438. :data {:graph-uuid graph-uuid
  2439. :epoch (tc/to-epoch (t/now))}})
  2440. (go-loop []
  2441. (let [{:keys [resume]} (<! ops-chan)]
  2442. (if resume
  2443. (let [{:keys [remote->local remote->local-full-sync local->remote local->remote-full-sync] :as resume-state}
  2444. (get @*resume-state graph-uuid)]
  2445. (resume-state--reset graph-uuid)
  2446. (vreset! *paused? false)
  2447. (cond
  2448. remote->local
  2449. (offer! private-remote->local-sync-chan true)
  2450. remote->local-full-sync
  2451. (offer! private-remote->local-full-sync-chan true)
  2452. local->remote
  2453. (>! ops-chan {:local->remote local->remote})
  2454. local->remote-full-sync
  2455. (offer! private-full-sync-chan true)
  2456. :else
  2457. ;; if resume-state = nil, try a remote->local to sync recent diffs
  2458. (offer! private-remote->local-sync-chan true))
  2459. (put-sync-event! {:event :resume
  2460. :data {:graph-uuid graph-uuid
  2461. :resume-state resume-state
  2462. :epoch (tc/to-epoch (t/now))}})
  2463. (<! (.schedule this ::idle nil :resume)))
  2464. (recur)))))
  2465. (idle [this]
  2466. (go
  2467. (let [{:keys [stop remote->local local->remote local->remote-full-sync remote->local-full-sync pause] :as result}
  2468. (<! ops-chan)]
  2469. (cond
  2470. stop
  2471. (<! (.schedule this ::stop nil nil))
  2472. remote->local
  2473. (<! (.schedule this ::remote->local {:remote remote->local} {:remote-changed remote->local}))
  2474. local->remote
  2475. (<! (.schedule this ::local->remote {:local local->remote} {:local-changed local->remote}))
  2476. local->remote-full-sync
  2477. (<! (.schedule this ::local->remote-full-sync nil nil))
  2478. remote->local-full-sync
  2479. (<! (.schedule this ::remote->local-full-sync nil nil))
  2480. pause
  2481. (<! (.schedule this ::pause nil nil))
  2482. :else
  2483. (do
  2484. (state/pub-event! [:instrument {:type :sync/wrong-ops-chan-when-idle
  2485. :payload {:ops-chan-result result
  2486. :state state}}])
  2487. nil)))))
  2488. (full-sync [this]
  2489. (go
  2490. (let [{:keys [succ need-sync-remote graph-has-been-deleted unknown stop] :as r}
  2491. (<! (<sync-local->remote-all-files! local->remote-syncer))]
  2492. (s/assert ::sync-local->remote-all-files!-result r)
  2493. (cond
  2494. succ
  2495. (do
  2496. (swap! *sync-state #(sync-state-reset-full-local->remote-files % []))
  2497. (put-sync-event! {:event :finished-local->remote
  2498. :data {:graph-uuid graph-uuid
  2499. :full-sync? true
  2500. :epoch (tc/to-epoch (t/now))}})
  2501. (.schedule this ::idle nil nil))
  2502. need-sync-remote
  2503. (do (util/drain-chan ops-chan)
  2504. (>! ops-chan {:remote->local true})
  2505. (>! ops-chan {:local->remote-full-sync true})
  2506. (.schedule this ::idle nil nil))
  2507. graph-has-been-deleted
  2508. (.schedule this ::stop nil :graph-has-been-deleted)
  2509. stop
  2510. (.schedule this ::stop nil nil)
  2511. unknown
  2512. (do
  2513. (state/pub-event! [:instrument {:type :sync/unknown
  2514. :event :local->remote-full-sync-failed
  2515. :graph-uuid graph-uuid
  2516. :payload {:error unknown}}])
  2517. (put-sync-event! {:event :local->remote-full-sync-failed
  2518. :data {:graph-uuid graph-uuid
  2519. :epoch (tc/to-epoch (t/now))}})
  2520. (.schedule this ::idle nil nil))))))
  2521. (remote->local-full-sync [this _]
  2522. (go
  2523. (let [{:keys [succ unknown stop pause]}
  2524. (<! (<sync-remote->local-all-files! remote->local-syncer))]
  2525. (cond
  2526. succ
  2527. (do (put-sync-event! {:event :finished-remote->local
  2528. :data {:graph-uuid graph-uuid
  2529. :full-sync? true
  2530. :epoch (tc/to-epoch (t/now))}})
  2531. (.schedule this ::idle nil nil))
  2532. stop
  2533. (.schedule this ::stop nil nil)
  2534. pause
  2535. (do (resume-state--add-remote->local-full-sync-state graph-uuid)
  2536. (.schedule this ::pause nil nil))
  2537. unknown
  2538. (do
  2539. (state/pub-event! [:instrument {:type :sync/unknown
  2540. :event :remote->local-full-sync-failed
  2541. :graph-uuid graph-uuid
  2542. :payload {:error unknown}}])
  2543. (put-sync-event! {:event :remote->local-full-sync-failed
  2544. :data {:graph-uuid graph-uuid
  2545. :exp unknown
  2546. :epoch (tc/to-epoch (t/now))}})
  2547. (let [next-state (if (string/includes? (str (ex-cause unknown)) "404 Not Found")
  2548. ;; TODO: this should never happen
  2549. ::pause
  2550. ;; if any other exception occurred, re-exec remote->local-full-sync
  2551. ::remote->local-full-sync)]
  2552. (.schedule this next-state nil nil)))))))
  2553. (remote->local [this _next-state {remote-val :remote}]
  2554. (go
  2555. (if (some-> remote-val :txid (<= @*txid))
  2556. (.schedule this ::idle nil nil)
  2557. (let [origin-txid @*txid
  2558. {:keys [succ unknown stop pause need-remote->local-full-sync] :as r}
  2559. (<! (<sync-remote->local! remote->local-syncer))]
  2560. (s/assert ::sync-remote->local!-result r)
  2561. (cond
  2562. need-remote->local-full-sync
  2563. (do (util/drain-chan ops-chan)
  2564. (>! ops-chan {:remote->local-full-sync true})
  2565. (>! ops-chan {:local->remote-full-sync true})
  2566. (.schedule this ::idle nil nil))
  2567. succ
  2568. (do (put-sync-event! {:event :finished-remote->local
  2569. :data {:graph-uuid graph-uuid
  2570. :full-sync? false
  2571. :from-txid origin-txid
  2572. :to-txid @*txid
  2573. :epoch (tc/to-epoch (t/now))}})
  2574. (.schedule this ::idle nil nil))
  2575. stop
  2576. (.schedule this ::stop nil nil)
  2577. pause
  2578. (do (resume-state--add-remote->local-state graph-uuid)
  2579. (.schedule this ::pause nil nil))
  2580. unknown
  2581. (do (prn "remote->local err" unknown)
  2582. (state/pub-event! [:instrument {:type :sync/unknown
  2583. :event :remote->local
  2584. :graph-uuid graph-uuid
  2585. :payload {:error unknown}}])
  2586. (.schedule this ::idle nil nil)))))))
  2587. (local->remote [this {local-changes :local}]
  2588. ;; local-changes:: list of FileChangeEvent
  2589. (assert (some? local-changes) local-changes)
  2590. (go
  2591. (let [distincted-local-changes (distinct-file-change-events local-changes)
  2592. _ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distincted-local-changes))
  2593. change-events-partitions
  2594. (sequence (partition-file-change-events upload-batch-size) distincted-local-changes)
  2595. _ (put-sync-event! {:event :start
  2596. :data {:type :local->remote
  2597. :graph-uuid graph-uuid
  2598. :full-sync? false
  2599. :epoch (tc/to-epoch (t/now))}})
  2600. {:keys [succ need-sync-remote graph-has-been-deleted unknown stop pause]}
  2601. (loop [es-partitions change-events-partitions]
  2602. (cond
  2603. @*stopped? {:stop true}
  2604. @*paused? {:pause true}
  2605. (empty? es-partitions) {:succ true}
  2606. :else
  2607. (let [{:keys [succ need-sync-remote graph-has-been-deleted pause unknown stop] :as r}
  2608. (<! (<sync-local->remote! local->remote-syncer (first es-partitions)))]
  2609. (s/assert ::sync-local->remote!-result r)
  2610. (cond
  2611. succ
  2612. (recur (next es-partitions))
  2613. (or need-sync-remote graph-has-been-deleted unknown pause stop) r))))]
  2614. (cond
  2615. succ
  2616. (do
  2617. (swap! *sync-state #(sync-state-reset-full-local->remote-files % []))
  2618. (put-sync-event! {:event :finished-local->remote
  2619. :data {:graph-uuid graph-uuid
  2620. :full-sync? false
  2621. :file-change-events distincted-local-changes
  2622. :epoch (tc/to-epoch (t/now))}})
  2623. (.schedule this ::idle nil nil))
  2624. need-sync-remote
  2625. (do (util/drain-chan ops-chan)
  2626. (>! ops-chan {:remote->local true})
  2627. (>! ops-chan {:local->remote local-changes})
  2628. (.schedule this ::idle nil nil))
  2629. graph-has-been-deleted
  2630. (.schedule this ::stop nil :graph-has-been-deleted)
  2631. stop
  2632. (.schedule this ::stop nil nil)
  2633. pause
  2634. (do (resume-state--add-local->remote-state graph-uuid local-changes)
  2635. (.schedule this ::pause nil nil))
  2636. unknown
  2637. (do
  2638. (debug/pprint "local->remote" unknown)
  2639. (state/pub-event! [:instrument {:type :sync/unknown
  2640. :event :local->remote
  2641. :graph-uuid graph-uuid
  2642. :payload {:error unknown}}])
  2643. (.schedule this ::idle nil nil))))))
  2644. IStoppable
  2645. (-stop! [_]
  2646. (go
  2647. (when-not @*stopped?
  2648. (vreset! *stopped? true)
  2649. (ws-stop! *ws)
  2650. (offer! private-stop-sync-chan true)
  2651. (async/untap full-sync-mult private-full-sync-chan)
  2652. (async/untap stop-sync-mult private-stop-sync-chan)
  2653. (async/untap remote->local-sync-mult private-remote->local-sync-chan)
  2654. (async/untap remote->local-full-sync-mult private-remote->local-full-sync-chan)
  2655. (async/untap pause-resume-mult private-pause-resume-chan)
  2656. (when ops-chan (async/close! ops-chan))
  2657. (stop-local->remote! local->remote-syncer)
  2658. (stop-remote->local! remote->local-syncer)
  2659. (<! (<rsapi-cancel-all-requests))
  2660. (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path])
  2661. (swap! *sync-state sync-state--update-state ::stop)
  2662. (loop []
  2663. (if (not= ::stop state)
  2664. (do
  2665. (<! (timeout 100))
  2666. (recur))
  2667. (reset! current-sm-graph-uuid nil))))))
  2668. IStopped?
  2669. (-stopped? [_]
  2670. @*stopped?))
  2671. (defn sync-manager [user-uuid graph-uuid base-path repo txid *sync-state]
  2672. (let [*txid (atom txid)
  2673. *stopped? (volatile! false)
  2674. *paused? (volatile! false)
  2675. remoteapi-with-stop (->RemoteAPI *stopped?)
  2676. local->remote-syncer (->Local->RemoteSyncer user-uuid graph-uuid
  2677. base-path
  2678. repo *sync-state remoteapi-with-stop
  2679. (if (mobile-util/native-platform?)
  2680. 2000
  2681. 10000)
  2682. *txid nil (chan) *stopped? *paused?
  2683. (chan 1) (chan 1))
  2684. remote->local-syncer (->Remote->LocalSyncer user-uuid graph-uuid base-path
  2685. repo *txid *sync-state remoteapi-with-stop
  2686. nil *stopped? *paused?)]
  2687. (.set-remote->local-syncer! local->remote-syncer remote->local-syncer)
  2688. (.set-local->remote-syncer! remote->local-syncer local->remote-syncer)
  2689. (swap! *sync-state sync-state--update-current-syncing-graph-uuid graph-uuid)
  2690. (->SyncManager graph-uuid base-path *sync-state local->remote-syncer remote->local-syncer remoteapi-with-stop
  2691. nil *txid nil nil nil *stopped? *paused? nil (chan 1) (chan 1) (chan 1) (chan 1) (chan 1))))
  2692. (defn sync-manager-singleton
  2693. [user-uuid graph-uuid base-path repo txid *sync-state]
  2694. (when-not @current-sm-graph-uuid
  2695. (reset! current-sm-graph-uuid graph-uuid)
  2696. (sync-manager user-uuid graph-uuid base-path repo txid *sync-state)))
  2697. ;; Avoid sync reentrancy
  2698. (defonce *sync-entered? (atom false))
  2699. (defn <sync-stop []
  2700. (go
  2701. (when-let [sm ^SyncManager (state/get-file-sync-manager (state/get-current-file-sync-graph-uuid))]
  2702. (println "[SyncManager" (:graph-uuid sm) "]" "stopping")
  2703. (state/clear-file-sync-state! (:graph-uuid sm))
  2704. (<! (-stop! sm))
  2705. (reset! *sync-entered? false)
  2706. (println "[SyncManager" (:graph-uuid sm) "]" "stopped"))
  2707. (reset! current-sm-graph-uuid nil)))
  2708. (defn <sync-local->remote-now []
  2709. (go
  2710. (when-let [_sm ^SyncManager (state/get-file-sync-manager (state/get-current-file-sync-graph-uuid))]
  2711. (offer! immediately-local->remote-chan true))))
  2712. (defn sync-need-password!
  2713. []
  2714. (when-let [sm ^SyncManager (state/get-file-sync-manager (state/get-current-file-sync-graph-uuid))]
  2715. (.need-password sm)))
  2716. (defn check-graph-belong-to-current-user
  2717. [current-user-uuid graph-user-uuid]
  2718. (cond
  2719. (nil? current-user-uuid)
  2720. false
  2721. (= current-user-uuid graph-user-uuid)
  2722. true
  2723. :else
  2724. (do (notification/show! (t :file-sync/other-user-graph) :warning false)
  2725. false)))
  2726. (defn <check-remote-graph-exists
  2727. [local-graph-uuid]
  2728. {:pre [(util/uuid-string? local-graph-uuid)]}
  2729. (go
  2730. (let [r (<! (<list-remote-graphs remoteapi))
  2731. result
  2732. (or
  2733. ;; if api call failed, assume this remote graph still exists
  2734. (instance? ExceptionInfo r)
  2735. (and
  2736. (contains? r :Graphs)
  2737. (->> (:Graphs r)
  2738. (mapv :GraphUUID)
  2739. set
  2740. (#(contains? % local-graph-uuid)))))]
  2741. (when-not result
  2742. (notification/show! (t :file-sync/graph-deleted) :warning false))
  2743. result)))
  2744. (defn sync-off?
  2745. [sync-state]
  2746. (or (nil? sync-state) (sync-state--stopped? sync-state)))
  2747. (defn graph-sync-off?
  2748. "Is sync not running for this `graph-uuid`?"
  2749. [graph-uuid]
  2750. (sync-off? (state/get-file-sync-state graph-uuid)))
  2751. (defn graph-encrypted?
  2752. []
  2753. (when-let [graph-uuid (second @graphs-txid)]
  2754. (get-pwd graph-uuid)))
  2755. (declare network-online-cursor)
  2756. (defn <sync-start
  2757. []
  2758. (go
  2759. (when (false? @*sync-entered?)
  2760. (reset! *sync-entered? true)
  2761. (let [*sync-state (atom (sync-state))
  2762. current-user-uuid (<! (user/<user-uuid))
  2763. ;; put @graph-uuid & get-current-repo together,
  2764. ;; prevent to get older repo dir and current graph-uuid.
  2765. _ (<! (p->c (persist-var/-load graphs-txid)))
  2766. [user-uuid graph-uuid txid] @graphs-txid
  2767. txid (or txid 0)
  2768. repo (state/get-current-repo)]
  2769. (when-not (instance? ExceptionInfo current-user-uuid)
  2770. (when (and repo
  2771. @network-online-cursor
  2772. user-uuid graph-uuid txid
  2773. (graph-sync-off? graph-uuid)
  2774. (user/logged-in?)
  2775. (not (config/demo-graph? repo)))
  2776. (try
  2777. (when-let [sm (sync-manager-singleton current-user-uuid graph-uuid
  2778. (config/get-repo-dir repo) repo
  2779. txid *sync-state)]
  2780. (when (check-graph-belong-to-current-user current-user-uuid user-uuid)
  2781. (if-not (<! (<check-remote-graph-exists graph-uuid)) ; remote graph has been deleted
  2782. (clear-graphs-txid! repo)
  2783. (do
  2784. (state/set-file-sync-state graph-uuid @*sync-state)
  2785. (state/set-file-sync-manager graph-uuid sm)
  2786. ;; update global state when *sync-state changes
  2787. (add-watch *sync-state ::update-global-state
  2788. (fn [_ _ _ n]
  2789. (state/set-file-sync-state graph-uuid n)))
  2790. (state/set-state! [:file-sync/graph-state :current-graph-uuid] graph-uuid)
  2791. (.start sm)
  2792. (offer! remote->local-full-sync-chan true)
  2793. (offer! full-sync-chan true)))))
  2794. (catch :default e
  2795. (prn "Sync start error: ")
  2796. (log/error :exception e)))))
  2797. (reset! *sync-entered? false)))))
  2798. (defn- restart-if-stopped!
  2799. [is-active?]
  2800. (cond
  2801. (and is-active? (graph-sync-off? (second @graphs-txid)))
  2802. (<sync-start)
  2803. :else
  2804. (offer! pause-resume-chan is-active?)))
  2805. (def app-state-changed-cursor (rum/cursor state/state :mobile/app-state-change))
  2806. (def finished-local->remote-chan (chan 1))
  2807. (let [*resumed? (atom false)
  2808. *into-background? (atom false)]
  2809. (add-watch app-state-changed-cursor "sync"
  2810. (fn [_ _ _ {:keys [is-active?]}]
  2811. (cond
  2812. (mobile-util/native-android?)
  2813. ;; TODO: support background task on Android
  2814. (restart-if-stopped! is-active?)
  2815. (mobile-util/native-ios?)
  2816. (let [*task-id (atom nil)]
  2817. (if is-active?
  2818. (do
  2819. (when @*into-background?
  2820. (reset! *into-background? false)
  2821. (reset! *resumed? true))
  2822. (restart-if-stopped! is-active?))
  2823. (when (state/get-current-file-sync-graph-uuid)
  2824. (p/let [task-id (.beforeExit ^js BackgroundTask
  2825. (fn []
  2826. (reset! *resumed? false)
  2827. (reset! *into-background? true)
  2828. (go
  2829. ;; Wait for file watcher events
  2830. (<! (timeout 2000))
  2831. (util/drain-chan finished-local->remote-chan)
  2832. (<! (<sync-local->remote-now))
  2833. ;; wait at most 20s
  2834. (async/alts! [finished-local->remote-chan (timeout 20000)])
  2835. (when-not @*resumed? (offer! pause-resume-chan is-active?))
  2836. (<! (timeout 5000))
  2837. (prn "finish task: " @*task-id)
  2838. (let [opt #js {:taskId @*task-id}]
  2839. (.finish ^js BackgroundTask opt)))))]
  2840. (reset! *task-id task-id)))))
  2841. :else
  2842. nil))))
  2843. ;;; ### some add-watches
  2844. ;; TOOD: replace this logic by pause/resume state
  2845. (defonce network-online-cursor (rum/cursor state/state :network/online?))
  2846. (add-watch network-online-cursor "sync-manage"
  2847. (fn [_k _r o n]
  2848. (cond
  2849. (and (true? o) (false? n))
  2850. (<sync-stop)
  2851. (and (false? o) (true? n))
  2852. (<sync-start)
  2853. :else
  2854. nil)))
  2855. (defonce auth-id-token-cursor (rum/cursor state/state :auth/id-token))
  2856. (add-watch auth-id-token-cursor "sync-manage"
  2857. (fn [_k _r _o n]
  2858. (when (nil? n)
  2859. (<sync-stop))))
  2860. ;;; ### some sync events handler
  2861. ;; re-exec remote->local-full-sync when it failed before
  2862. (def re-remote->local-full-sync-chan (chan 1))
  2863. (async/sub sync-events-publication :remote->local-full-sync-failed re-remote->local-full-sync-chan)
  2864. (go-loop []
  2865. (let [{{graph-uuid :graph-uuid} :data} (<! re-remote->local-full-sync-chan)
  2866. {:keys [current-syncing-graph-uuid]}
  2867. (state/get-file-sync-state graph-uuid)]
  2868. (when (= graph-uuid current-syncing-graph-uuid)
  2869. (offer! remote->local-full-sync-chan true))
  2870. (recur)))
  2871. ;; re-exec local->remote-full-sync when it failed
  2872. (def re-local->remote-full-sync-chan (chan 1))
  2873. (async/sub sync-events-publication :local->remote-full-sync-failed re-local->remote-full-sync-chan)
  2874. (go-loop []
  2875. (let [{{graph-uuid :graph-uuid} :data} (<! re-local->remote-full-sync-chan)
  2876. {:keys [current-syncing-graph-uuid]} (state/get-file-sync-state graph-uuid)]
  2877. (when (= graph-uuid current-syncing-graph-uuid)
  2878. (offer! full-sync-chan true))
  2879. (recur)))
  2880. ;;; add-tap
  2881. (comment
  2882. (def *x (atom nil))
  2883. (add-tap (fn [v] (reset! *x v)))
  2884. )