sync.cljs 148 KB


  1. (ns frontend.fs.sync
  2. "Main ns for providing file sync functionality"
  3. (:require ["@capawesome/capacitor-background-task" :refer [BackgroundTask]]
  4. ["path" :as node-path]
  5. [cljs-http.client :as http]
  6. [cljs-time.coerce :as tc]
  7. [cljs-time.core :as t]
  8. [cljs-time.format :as tf]
  9. [cljs.core.async :as async :refer [<! >! chan go go-loop offer!
  10. poll! timeout]]
  11. [cljs.core.async.impl.channels]
  12. [cljs.core.async.interop :refer [p->c]]
  13. [cljs.spec.alpha :as s]
  14. [clojure.pprint :as pp]
  15. [clojure.set :as set]
  16. [clojure.string :as string]
  17. [electron.ipc :as ipc]
  18. [frontend.config :as config]
  19. [frontend.context.i18n :refer [t]]
  20. [frontend.db :as db]
  21. [frontend.debug :as debug]
  22. [frontend.diff :as diff]
  23. [frontend.encrypt :as encrypt]
  24. [frontend.fs :as fs]
  25. [frontend.fs.capacitor-fs :as capacitor-fs]
  26. [frontend.fs.diff-merge :as diff-merge]
  27. [frontend.handler.file :as file-handler]
  28. [frontend.handler.notification :as notification]
  29. [frontend.handler.user :as user]
  30. [frontend.mobile.util :as mobile-util]
  31. [frontend.pubsub :as pubsub]
  32. [frontend.state :as state]
  33. [frontend.util :as util]
  34. [frontend.util.fs :as fs-util]
  35. [frontend.util.persist-var :as persist-var]
  36. [goog.string :as gstring]
  37. [lambdaisland.glogi :as log]
  38. [logseq.common.path :as path]
  39. [logseq.graph-parser.util :as gp-util]
  40. [medley.core :refer [dedupe-by]]
  41. [promesa.core :as p]
  42. [rum.core :as rum]))
  43. ;;; ### Commentary
  44. ;; file-sync related local files/dirs:
  45. ;; - logseq/graphs-txid.edn
  46. ;; this file contains [user-uuid graph-uuid transaction-id]
  47. ;; graph-uuid: the unique identifier of the graph on the server
  48. ;; transaction-id: sync progress of local files
  49. ;; - logseq/version-files
  50. ;; downloaded version-files
  51. ;; files included by `get-ignored-files` will not be synchronized.
  52. ;;
  53. ;; sync strategy:
  54. ;; - when toggle file-sync on,
  55. ;; trigger remote->local-full-sync first, then local->remote-full-sync
  56. ;; local->remote-full-sync will compare local-files with remote-files (by md5),
  57. ;; and upload new-added-files to remote server.
  58. ;; - if local->remote sync(normal-sync or full-sync) return :need-sync-remote,
  59. ;; then trigger a remote->local sync
  60. ;; - if remote->local sync return :need-remote->local-full-sync,
  61. ;; then we need a remote->local-full-sync,
  62. ;; which compare local-files with remote-files, sync diff-remote-files to local
  63. ;; - local->remote-full-sync will be triggered after 20mins of idle
  64. ;; - every 10s, flush local changes, and sync to remote
  65. ;; TODO: use access-token instead of id-token
  66. ;; TODO: a remote delete-diff cause local related-file deleted, then trigger a `FileChangeEvent`,
  67. ;; and re-produce a new same-file-delete diff.
  68. ;;; ### specs
  69. (s/def ::state #{;; do following jobs when ::starting:
  70. ;; - wait seconds for file-change-events from file-watcher
  71. ;; - drop redundant file-change-events
  72. ;; - setup states in `frontend.state`
  73. ::starting
  74. ::need-password
  75. ::idle
  76. ;; sync local-changed files
  77. ::local->remote
  78. ;; sync remote latest-transactions
  79. ::remote->local
  80. ;; local->remote full sync
  81. ::local->remote-full-sync
  82. ;; remote->local full sync
  83. ::remote->local-full-sync
  84. ;; snapshot state when switching between apps on iOS
  85. ::pause
  86. ::stop})
  87. (s/def ::path string?)
  88. (s/def ::time t/date?)
  89. (s/def ::remote->local-type #{:delete :update
  90. ;; :rename=:delete+:update
  91. })
  92. (s/def ::current-syncing-graph-uuid (s/or :nil nil? :graph-uuid string?))
  93. (s/def ::recent-remote->local-file-item (s/keys :req-un [::remote->local-type ::checksum ::path]))
  94. (s/def ::current-local->remote-files (s/coll-of ::path :kind set?))
  95. (s/def ::current-remote->local-files (s/coll-of ::path :kind set?))
  96. (s/def ::recent-remote->local-files (s/coll-of ::recent-remote->local-file-item :kind set?))
  97. (s/def ::history-item (s/keys :req-un [::path ::time]))
  98. (s/def ::history (s/coll-of ::history-item :kind seq?))
  99. (s/def ::sync-state (s/keys :req-un [::current-syncing-graph-uuid
  100. ::state
  101. ::current-local->remote-files
  102. ::current-remote->local-files
  103. ::queued-local->remote-files
  104. ;; Downloading files from remote will trigger filewatcher events,
  105. ;; causes unreasonable information in the content of ::queued-local->remote-files,
  106. ;; use ::recent-remote->local-files to filter such events
  107. ::recent-remote->local-files
  108. ::history]))
  109. ;; diff
  110. (s/def ::TXId pos-int?)
  111. (s/def ::TXType #{"update_files" "delete_files" "rename_file"})
  112. (s/def ::TXContent-to-path string?)
  113. (s/def ::TXContent-from-path (s/or :some string? :none nil?))
  114. (s/def ::TXContent-checksum (s/or :some string? :none nil?))
  115. (s/def ::TXContent-item (s/tuple ::TXContent-to-path
  116. ::TXContent-from-path
  117. ::TXContent-checksum))
  118. (s/def ::TXContent (s/coll-of ::TXContent-item))
  119. (s/def ::diff (s/keys :req-un [::TXId ::TXType ::TXContent]))
  120. (s/def ::succ-map #(= {:succ true} %))
  121. (s/def ::unknown-map (comp some? :unknown))
  122. (s/def ::stop-map #(= {:stop true} %))
  123. (s/def ::pause-map #(= {:pause true} %))
  124. (s/def ::need-sync-remote #(= {:need-sync-remote true} %))
  125. (s/def ::graph-has-been-deleted #(= {:graph-has-been-deleted true} %))
  126. (s/def ::sync-local->remote!-result
  127. (s/or :stop ::stop-map
  128. :succ ::succ-map
  129. :pause ::pause-map
  130. :need-sync-remote ::need-sync-remote
  131. :graph-has-been-deleted ::graph-has-been-deleted
  132. :unknown ::unknown-map))
  133. (s/def ::sync-remote->local!-result
  134. (s/or :succ ::succ-map
  135. :need-remote->local-full-sync
  136. #(= {:need-remote->local-full-sync true} %)
  137. :stop ::stop-map
  138. :pause ::pause-map
  139. :unknown ::unknown-map))
  140. (s/def ::sync-local->remote-all-files!-result
  141. (s/or :succ ::succ-map
  142. :stop ::stop-map
  143. :need-sync-remote ::need-sync-remote
  144. :graph-has-been-deleted ::graph-has-been-deleted
  145. :unknown ::unknown-map))
  146. ;; sync-event type
  147. (s/def ::event #{:created-local-version-file
  148. :finished-local->remote
  149. :finished-remote->local
  150. :start
  151. :pause
  152. :resume
  153. :exception-decrypt-failed
  154. :remote->local-full-sync-failed
  155. :local->remote-full-sync-failed
  156. :get-remote-graph-failed
  157. :get-deletion-logs-failed
  158. })
  159. (s/def ::sync-event (s/keys :req-un [::event ::data]))
  160. (defonce download-batch-size 100)
  161. (defonce upload-batch-size 20)
  162. (def ^:private current-sm-graph-uuid (atom nil))
  163. ;;; ### configs in config.edn
  164. ;; - :file-sync/ignore-files
  165. (defn- get-ignored-files
  166. []
  167. (into #{#"logseq/graphs-txid.edn$"
  168. #"logseq/pages-metadata.edn$"
  169. #"logseq/version-files/"
  170. #"logseq/bak/"
  171. #"node_modules/"
  172. ;; path starts with `.` in the root directory, e.g. .gitignore
  173. #"^\.[^.]+"
  174. ;; path includes `/.`, e.g. .git, .DS_store
  175. #"/\."
  176. ;; Emacs/Vim backup files end with `~` by default
  177. #"~$"}
  178. (map re-pattern)
  179. (:file-sync/ignore-files (state/get-config))))
  180. ;;; ### configs ends
  181. (def ws-addr config/WS-URL)
  182. ;; Warning: make sure to `persist-var/-load` graphs-txid before using it.
  183. (defonce graphs-txid (persist-var/persist-var nil "graphs-txid"))
  184. (declare assert-local-txid<=remote-txid)
  185. (defn <update-graphs-txid!
  186. [latest-txid graph-uuid user-uuid repo]
  187. {:pre [(int? latest-txid) (>= latest-txid 0)]}
  188. (-> (p/let [_ (persist-var/-reset-value! graphs-txid [user-uuid graph-uuid latest-txid] repo)
  189. _ (persist-var/persist-save graphs-txid)]
  190. (when (state/developer-mode?) (assert-local-txid<=remote-txid)))
  191. p->c))
  192. (defn clear-graphs-txid! [repo]
  193. (persist-var/-reset-value! graphs-txid nil repo)
  194. (persist-var/persist-save graphs-txid))
  195. (defn- ws-ping-loop [ws]
  196. (go-loop []
  197. (let [state (.-readyState ws)]
  198. ;; not closing or closed state
  199. (when (not (contains? #{2 3} state))
  200. (if (not= 1 state)
  201. ;; when connecting, wait 1s
  202. (do (<! (timeout 1000))
  203. (recur))
  204. (do (.send ws "PING")
  205. ;; aws apigateway websocket
  206. ;; Idle Connection Timeout: 10min
  207. (<! (timeout (* 5 60 1000)))
  208. (recur)))))))
  209. (defn- ws-stop! [*ws]
  210. (when *ws
  211. (swap! *ws (fn [o] (assoc o :stop true)))
  212. (when-let [ws (:ws @*ws)]
  213. (.close ws))))
  214. (defn- ws-listen!*
  215. [graph-uuid *ws remote-changes-chan]
  216. (reset! *ws {:ws (js/WebSocket. (util/format ws-addr graph-uuid)) :stop false})
  217. (ws-ping-loop (:ws @*ws))
  218. ;; (set! (.-onopen (:ws @*ws)) #(println (util/format "ws opened: graph '%s'" graph-uuid %)))
  219. (set! (.-onclose (:ws @*ws)) (fn [_e]
  220. (when-not (true? (:stop @*ws))
  221. (go
  222. (timeout 1000)
  223. (println "re-connecting graph" graph-uuid)
  224. (ws-listen!* graph-uuid *ws remote-changes-chan)))))
  225. (set! (.-onmessage (:ws @*ws)) (fn [e]
  226. (let [data (js->clj (js/JSON.parse (.-data e)) :keywordize-keys true)]
  227. (when (some? (:txid data))
  228. (if-let [v (poll! remote-changes-chan)]
  229. (let [last-txid (:txid v)
  230. current-txid (:txid data)]
  231. (if (> last-txid current-txid)
  232. (offer! remote-changes-chan v)
  233. (offer! remote-changes-chan data)))
  234. (offer! remote-changes-chan data)))))))
  235. (defn ws-listen!
  236. "return channel which output messages from server"
  237. [graph-uuid *ws]
  238. (let [remote-changes-chan (chan (async/sliding-buffer 1))]
  239. (ws-listen!* graph-uuid *ws remote-changes-chan)
  240. remote-changes-chan))
  241. (defn- get-json-body [body]
  242. (or (and (not (string? body)) body)
  243. (or (string/blank? body) nil)
  244. (js->clj (js/JSON.parse body) :keywordize-keys true)))
  245. (defn- get-resp-json-body [resp]
  246. (-> resp (:body) (get-json-body)))
  247. (defn- <request-once [api-name body token]
  248. (go
  249. (let [resp (http/post (str "https://" config/API-DOMAIN "/file-sync/" api-name)
  250. {:oauth-token token
  251. :body (js/JSON.stringify (clj->js body))
  252. :with-credentials? false})]
  253. {:resp (<! resp)
  254. :api-name api-name
  255. :body body})))
  256. ;; For debug
  257. (def *on-flying-request
  258. "requests not finished"
  259. (atom #{}))
  260. (def stoppable-apis #{"get_all_files"})
  261. (defn- <request*
  262. "max retry count is 5.
  263. *stop: volatile var, stop retry-request when it's true,
  264. and return :stop"
  265. ([api-name body token *stop] (<request* api-name body token 0 *stop))
  266. ([api-name body token retry-count *stop]
  267. (go
  268. (if (and *stop @*stop (contains? stoppable-apis api-name))
  269. :stop
  270. (let [resp (<! (<request-once api-name body token))]
  271. (if (and
  272. (= 401 (get-in resp [:resp :status]))
  273. (= "Unauthorized" (:message (get-json-body (get-in resp [:resp :body])))))
  274. (if (> retry-count 5)
  275. (throw (js/Error. :file-sync-request))
  276. (do (println "will retry after" (min 60000 (* 1000 retry-count)) "ms")
  277. (<! (timeout (min 60000 (* 1000 retry-count))))
  278. (<! (<request* api-name body token (inc retry-count) *stop))))
  279. (:resp resp)))))))
  280. (defn <request [api-name & args]
  281. (let [name (str api-name (.now js/Date))]
  282. (go (swap! *on-flying-request conj name)
  283. (let [r (<! (apply <request* api-name args))]
  284. (swap! *on-flying-request disj name)
  285. r))))
  286. (defn- remove-dir-prefix [dir path]
  287. (let [r (string/replace path (js/RegExp. (str "^" (gstring/regExpEscape dir))) "")]
  288. (if (string/starts-with? r "/")
  289. (string/replace-first r "/" "")
  290. r)))
  291. (defn- remove-user-graph-uuid-prefix
  292. "<user-uuid>/<graph-uuid>/path -> path"
  293. [path]
  294. (let [parts (string/split path "/")]
  295. (if (and (< 2 (count parts))
  296. (= 36 (count (parts 0)))
  297. (= 36 (count (parts 1))))
  298. (util/string-join-path (drop 2 parts))
  299. path)))
  300. (defprotocol IRelativePath
  301. (-relative-path [this]))
  302. (defn relative-path [o]
  303. (let [repo-dir (config/get-repo-dir (state/get-current-repo))]
  304. (cond
  305. (implements? IRelativePath o)
  306. (-relative-path o)
  307. ;; full path
  308. (and (string? o) (string/starts-with? o repo-dir))
  309. (string/replace o (str repo-dir "/") "")
  310. (string? o)
  311. (remove-user-graph-uuid-prefix o)
  312. :else
  313. (throw (js/Error. (str "unsupported type " (str o)))))))
  314. (defprotocol IChecksum
  315. (-checksum [this]))
  316. (defprotocol IStoppable
  317. (-stop! [this]))
  318. (defprotocol IStopped?
  319. (-stopped? [this]))
  320. ;from-path, to-path is relative path
  321. (deftype FileTxn [from-path to-path updated? deleted? txid checksum]
  322. Object
  323. (renamed? [_]
  324. (not= from-path to-path))
  325. IRelativePath
  326. (-relative-path [_] (remove-user-graph-uuid-prefix to-path))
  327. IEquiv
  328. (-equiv [_ ^FileTxn other]
  329. (and (= from-path (.-from-path other))
  330. (= to-path (.-to-path other))
  331. (= updated? (.-updated? other))
  332. (= deleted? (.-deleted? other))))
  333. IHash
  334. (-hash [_] (hash [from-path to-path updated? deleted?]))
  335. IComparable
  336. (-compare [_ ^FileTxn other]
  337. (compare txid (.-txid other)))
  338. IPrintWithWriter
  339. (-pr-writer [coll w _opts]
  340. (write-all w "#FileTxn[\"" from-path "\" -> \"" to-path
  341. "\" (updated? " updated? ", renamed? " (.renamed? coll) ", deleted? " deleted?
  342. ", txid " txid ", checksum " checksum ")]")))
  343. (defn- assert-filetxns
  344. [filetxns]
  345. (every? true?
  346. (mapv
  347. (fn [^FileTxn filetxn]
  348. (if (.-updated? filetxn)
  349. (some? (-checksum filetxn))
  350. true))
  351. filetxns)))
  352. (defn- diff->filetxns
  353. "convert diff(`<get-diff`) to `FileTxn`"
  354. [{:keys [TXId TXType TXContent]}]
  355. {:post [(assert-filetxns %)]}
  356. (let [update? (= "update_files" TXType)
  357. delete? (= "delete_files" TXType)
  358. update-xf
  359. (comp
  360. (remove #(or (empty? (first %))
  361. (empty? (last %))))
  362. (map #(->FileTxn (first %) (first %) update? delete? TXId (last %))))
  363. delete-xf
  364. (comp
  365. (remove #(empty? (first %)))
  366. (map #(->FileTxn (first %) (first %) update? delete? TXId nil)))
  367. rename-xf
  368. (comp
  369. (remove #(or (empty? (first %))
  370. (empty? (second %))))
  371. (map #(->FileTxn (second %) (first %) false false TXId nil)))
  372. xf (case TXType
  373. "delete_files" delete-xf
  374. "update_files" update-xf
  375. "rename_file" rename-xf)]
  376. (sequence xf TXContent)))
  377. (defn- distinct-update-filetxns-xf
  378. "transducer.
  379. remove duplicate update&delete `FileTxn`s."
  380. [rf]
  381. (let [seen-update&delete-filetxns (volatile! #{})]
  382. (fn
  383. ([] (rf))
  384. ([result] (rf result))
  385. ([result ^FileTxn filetxn]
  386. (if (and
  387. (or (.-updated? filetxn) (.-deleted? filetxn))
  388. (contains? @seen-update&delete-filetxns filetxn))
  389. result
  390. (do (vswap! seen-update&delete-filetxns conj filetxn)
  391. (rf result filetxn)))))))
  392. (defn- remove-deleted-filetxns-xf
  393. "transducer.
  394. remove update&rename filetxns if they are deleted later(in greater txid filetxn)."
  395. [rf]
  396. (let [seen-deleted-paths (volatile! #{})]
  397. (fn
  398. ([] (rf))
  399. ([result] (rf result))
  400. ([result ^FileTxn filetxn]
  401. (let [to-path (.-to-path filetxn)
  402. from-path (.-from-path filetxn)]
  403. (if (contains? @seen-deleted-paths to-path)
  404. (do (when (not= to-path from-path)
  405. (vswap! seen-deleted-paths disj to-path)
  406. (vswap! seen-deleted-paths conj from-path))
  407. result)
  408. (do (vswap! seen-deleted-paths conj to-path)
  409. (rf result filetxn))))))))
  410. (defn- partition-filetxns
  411. "return transducer.
  412. partition filetxns, at most N update-filetxns in each partition,
  413. for delete and rename type, only one filetxn in each partition."
  414. [n]
  415. (comp
  416. (partition-by #(.-updated? ^FileTxn %))
  417. (map (fn [ts]
  418. (if (some-> (first ts) (.-updated?))
  419. (partition-all n ts)
  420. (map list ts))))
  421. cat))
  422. (defn- contains-path? [regexps path]
  423. (reduce #(when (re-find %2 path) (reduced true)) false regexps))
  424. (defn ignored?
  425. "Whether file is ignored when syncing."
  426. [path]
  427. (->
  428. (get-ignored-files)
  429. (contains-path? (relative-path path))
  430. (boolean)))
  431. (defn- filter-download-files-with-reserved-chars
  432. "Skip downloading file paths with reserved chars."
  433. [files]
  434. (let [f #(and
  435. (not (.-deleted? ^js %))
  436. (fs-util/include-reserved-chars? (-relative-path %)))
  437. reserved-files (filter f files)]
  438. (when (seq reserved-files)
  439. (state/pub-event! [:ui/notify-skipped-downloading-files
  440. (map -relative-path reserved-files)])
  441. (prn "Skipped downloading those file paths with reserved chars: "
  442. (map -relative-path reserved-files)))
  443. (remove f files)))
  444. (defn- filter-upload-files-with-reserved-chars
  445. "Remove upoading file paths with reserved chars."
  446. [paths]
  447. (let [path-string? (string? (first paths))
  448. f (if path-string?
  449. fs-util/include-reserved-chars?
  450. #(fs-util/include-reserved-chars? (-relative-path %)))
  451. reserved-paths (filter f paths)]
  452. (when (seq reserved-paths)
  453. (let [paths (if path-string? reserved-paths (map -relative-path reserved-paths))]
  454. (when (seq paths)
  455. (state/pub-event! [:ui/notify-outdated-filename-format paths]))
  456. (prn "Skipped uploading those file paths with reserved chars: " paths)))
  457. (vec (remove f paths))))
  458. (defn- diffs->filetxns
  459. "transducer.
  460. 1. diff -> `FileTxn` , see also `<get-diff`
  461. 2. distinct redundant update type filetxns
  462. 3. remove update or rename filetxns if they are deleted in later filetxns.
  463. NOTE: this xf should apply on reversed diffs sequence (sort by txid)"
  464. []
  465. (comp
  466. (map diff->filetxns)
  467. cat
  468. (remove ignored?)
  469. distinct-update-filetxns-xf
  470. remove-deleted-filetxns-xf))
  471. (defn- diffs->partitioned-filetxns
  472. "partition filetxns, each partition contains same type filetxns,
  473. for update type, at most N items in each partition
  474. for delete & rename type, only 1 item in each partition."
  475. [n]
  476. (comp
  477. (diffs->filetxns)
  478. (partition-filetxns n)))
  479. (defn- filepath+checksum->diff
  480. [index {:keys [relative-path checksum user-uuid graph-uuid]}]
  481. {:post [(s/valid? ::diff %)]}
  482. {:TXId (inc index)
  483. :TXType "update_files"
  484. :TXContent [[(util/string-join-path [user-uuid graph-uuid relative-path]) nil checksum]]})
  485. (defn filepath+checksum-coll->partitioned-filetxns
  486. "transducer.
  487. 1. filepath+checksum-coll -> diff
  488. 2. diffs->partitioned-filetxns
  489. 3. filter by config"
  490. [n graph-uuid user-uuid]
  491. (comp
  492. (map (fn [p]
  493. {:relative-path (first p) :user-uuid user-uuid :graph-uuid graph-uuid :checksum (second p)}))
  494. (map-indexed filepath+checksum->diff)
  495. (diffs->partitioned-filetxns n)))
  496. (deftype FileMetadata [size etag path encrypted-path last-modified remote? txid ^:mutable normalized-path]
  497. Object
  498. (get-normalized-path [_]
  499. (assert (string? path) path)
  500. (when-not normalized-path
  501. (set! normalized-path
  502. (cond-> path
  503. (string/starts-with? path "/") (string/replace-first "/" "")
  504. remote? (remove-user-graph-uuid-prefix))))
  505. normalized-path)
  506. IRelativePath
  507. (-relative-path [_] path)
  508. IEquiv
  509. (-equiv [o ^FileMetadata other]
  510. (and (= (.get-normalized-path o) (.get-normalized-path other))
  511. (= etag (.-etag other))))
  512. IHash
  513. (-hash [_] (hash {:etag etag :path path}))
  514. ILookup
  515. (-lookup [o k] (-lookup o k nil))
  516. (-lookup [_ k not-found]
  517. (case k
  518. :size size
  519. :etag etag
  520. :path path
  521. :encrypted-path encrypted-path
  522. :last-modified last-modified
  523. :remote? remote?
  524. :txid txid
  525. not-found))
  526. IPrintWithWriter
  527. (-pr-writer [_ w _opts]
  528. (write-all w (str {:size size :etag etag :path path :remote? remote? :txid txid :last-modified last-modified}))))
  529. (def ^:private higher-priority-remote-files
  530. "when diff all remote files and local files, following remote files always need to download(when checksum not matched),
  531. even local-file's last-modified > remote-file's last-modified.
  532. because these files will be auto created when the graph created, we dont want them to re-write related remote files."
  533. #{"pages/contents.md" "pages/contents.org"
  534. "logseq/metadata.edn"})
  535. (def ^:private ignore-default-value-files
  536. "when create a new local graph, some files will be created (config.edn, custom.css).
  537. And related remote files wins if these files have default template value."
  538. #{"logseq/config.edn" "logseq/custom.css"})
  539. (def ^:private empty-custom-css-md5 "d41d8cd98f00b204e9800998ecf8427e")
  540. ;; TODO: use fn some to filter FileMetadata here, it cause too much loop
  541. (defn diff-file-metadata-sets
  542. "Find the `FileMetadata`s that exists in s1 and does not exist in s2,
  543. compare by path+checksum+last-modified,
  544. if s1.path = s2.path & s1.checksum <> s2.checksum & s1.last-modified > s2.last-modified
  545. (except some default created files),
  546. keep this `FileMetadata` in result"
  547. [s1 s2]
  548. (reduce
  549. (fn [result item]
  550. (let [path (:path item)
  551. lower-case-path (some-> path string/lower-case)
  552. ;; encrypted-path (:encrypted-path item)
  553. checksum (:etag item)
  554. last-modified (:last-modified item)]
  555. (if (some
  556. #(cond
  557. (not= lower-case-path (some-> (:path %) string/lower-case))
  558. false
  559. (= checksum (:etag %))
  560. true
  561. (>= last-modified (:last-modified %))
  562. false
  563. ;; these special files have higher priority in s1
  564. (contains? higher-priority-remote-files path)
  565. false
  566. ;; higher priority in s1 when config.edn=default value or empty custom.css
  567. (and (contains? ignore-default-value-files path)
  568. (#{config/config-default-content-md5 empty-custom-css-md5} (:etag %)))
  569. false
  570. (< last-modified (:last-modified %))
  571. true)
  572. s2)
  573. result
  574. (conj result item))))
  575. #{} s1))
  576. (comment
  577. (defn map->FileMetadata [m]
  578. (apply ->FileMetadata ((juxt :size :etag :path :encrypted-path :last-modified :remote? (constantly nil)) m)))
  579. (assert
  580. (=
  581. #{(map->FileMetadata {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2})}
  582. (diff-file-metadata-sets
  583. (into #{}
  584. (map map->FileMetadata)
  585. [{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1}
  586. {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2}])
  587. (into #{}
  588. (map map->FileMetadata)
  589. [{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1}
  590. {:size 1 :etag 1 :path 2 :encrypted-path 2 :last-modified 1}])))))
  591. (extend-protocol IChecksum
  592. FileMetadata
  593. (-checksum [this] (.-etag this))
  594. FileTxn
  595. (-checksum [this] (.-checksum this)))
  596. (defn- sort-file-metadata-fn
  597. ":recent-days-range > :favorite-pages > small-size pages > ...
  598. :recent-days-range : [<min-inst-ms> <max-inst-ms>]
  599. "
  600. [& {:keys [recent-days-range favorite-pages]}]
  601. {:pre [(or (nil? recent-days-range)
  602. (every? number? recent-days-range))]}
  603. (let [favorite-pages* (set favorite-pages)]
  604. (fn [^FileMetadata item]
  605. (let [path (relative-path item)
  606. journal-dir (node-path/join (config/get-journals-directory) node-path/sep)
  607. journal? (string/starts-with? path journal-dir)
  608. journal-day
  609. (when journal?
  610. (try
  611. (tc/to-long
  612. (tf/parse (tf/formatter "yyyy_MM_dd")
  613. (-> path
  614. (string/replace-first journal-dir "")
  615. (string/replace-first ".md" ""))))
  616. (catch :default _)))]
  617. (cond
  618. (and recent-days-range
  619. journal-day
  620. (<= (first recent-days-range)
  621. ^number journal-day
  622. (second recent-days-range)))
  623. journal-day
  624. (string/includes? path "logseq/")
  625. 9999
  626. (string/includes? path "content.")
  627. 10000
  628. (contains? favorite-pages* path)
  629. (count path)
  630. :else
  631. (- (.-size item)))))))
  632. ;;; ### path-normalize
  633. (def path-normalize
  634. gp-util/path-normalize)
  635. ;;; ### APIs
  636. ;; `RSAPI` call apis through rsapi package, supports operations on files
  637. (defprotocol IRSAPI
  638. (rsapi-ready? [this graph-uuid] "return true when rsapi ready")
  639. (<key-gen [this] "generate public+private keys")
  640. (<set-env [this graph-uuid prod? private-key public-key] "set environment")
  641. (<get-local-files-meta [this graph-uuid base-path filepaths] "get local files' metadata")
  642. (<get-local-all-files-meta [this graph-uuid base-path] "get all local files' metadata")
  643. (<rename-local-file [this graph-uuid base-path from to])
  644. (<update-local-files [this graph-uuid base-path filepaths] "remote -> local")
  645. (<fetch-remote-files [this graph-uuid base-path filepaths] "remote -> local version-db")
  646. (<download-version-files [this graph-uuid base-path filepaths])
  647. (<delete-local-files [this graph-uuid base-path filepaths])
  648. (<update-remote-files [this graph-uuid base-path filepaths local-txid] "local -> remote, return err or txid")
  649. (<delete-remote-files [this graph-uuid base-path filepaths local-txid] "return err or txid")
  650. (<encrypt-fnames [this graph-uuid fnames])
  651. (<decrypt-fnames [this graph-uuid fnames])
  652. (<cancel-all-requests [this])
  653. (<add-new-version [this repo path content]))
  654. (defprotocol IRemoteAPI
  655. (<user-info [this] "user info")
  656. (<get-remote-all-files-meta [this graph-uuid] "get all remote files' metadata")
  657. (<get-remote-files-meta [this graph-uuid filepaths] "get remote files' metadata")
  658. (<get-remote-graph [this graph-name-opt graph-uuid-opt] "get graph info by GRAPH-NAME-OPT or GRAPH-UUID-OPT")
  659. (<get-remote-txid [this graph-uuid] "get remote graph's txid")
  660. (<get-remote-file-versions [this graph-uuid filepath] "get file's version list")
  661. (<list-remote-graphs [this] "list all remote graphs")
  662. (<get-deletion-logs [this graph-uuid from-txid] "get deletion logs from FROM-TXID")
  663. (<get-diff [this graph-uuid from-txid] "get diff from FROM-TXID, return [txns, latest-txid, min-txid]")
  664. (<create-graph [this graph-name] "create graph")
  665. (<delete-graph [this graph-uuid] "delete graph")
  666. (<get-graph-salt [this graph-uuid] "return httpcode 410 when salt expired")
  667. (<create-graph-salt [this graph-uuid] "return httpcode 409 when salt already exists and not expired yet")
  668. (<get-graph-encrypt-keys [this graph-uuid])
  669. (<upload-graph-encrypt-keys [this graph-uuid public-key encrypted-private-key]))
  670. (defprotocol IRemoteControlAPI
  671. "api functions provided for outside the sync process"
  672. (<delete-remote-files-control [this graph-uuid filepaths])
  673. )
  674. (defprotocol IToken
  675. (<get-token [this]))
  676. (defn <case-different-local-file-exist?
  677. "e.g. filepath=\"pages/Foo.md\"
  678. found-filepath=\"pages/foo.md\"
  679. it happens on macos (case-insensitive fs)
  680. return canonicalized filepath if exists"
  681. [graph-uuid irsapi base-path filepath]
  682. (go
  683. (let [r (<! (<get-local-files-meta irsapi graph-uuid base-path [filepath]))]
  684. (when (some-> r first :path (not= filepath))
  685. (-> r first :path)))))
  686. (defn <local-file-not-exist?
  687. [graph-uuid irsapi base-path filepath]
  688. (go
  689. (let [r (<! (<get-local-files-meta irsapi graph-uuid base-path [filepath]))]
  690. (or
  691. ;; not found at all
  692. (empty? r)
  693. ;; or,
  694. ;; e.g. filepath="pages/Foo.md"
  695. ;; found-filepath="pages/foo.md"
  696. ;; it happens on macos (case-insensitive fs)
  697. (not= filepath (:path (first r)))))))
  698. (defn- <retry-rsapi [f]
  699. (go-loop [n 3]
  700. (let [r (<! (f))]
  701. (when (instance? ExceptionInfo r)
  702. (js/console.error "rsapi error:" (str (ex-cause r))))
  703. (if (and (instance? ExceptionInfo r)
  704. (string/index-of (str (ex-cause r)) "operation timed out")
  705. (> n 0))
  706. (do
  707. (print (str "retry(" n ") ..."))
  708. (recur (dec n)))
  709. r))))
  710. (declare <rsapi-cancel-all-requests)
  711. (defn- <build-local-file-metadatas
  712. [this graph-uuid r]
  713. (go-loop [[[path metadata] & others] (js->clj r)
  714. result #{}]
  715. (if-not (and path metadata)
  716. ;; finish
  717. result
  718. (let [normalized-path (path-normalize path)
  719. encryptedFname (if (not= path normalized-path)
  720. (first (<! (<encrypt-fnames this graph-uuid [normalized-path])))
  721. (get metadata "encryptedFname"))]
  722. (recur others
  723. (conj result
  724. (->FileMetadata (get metadata "size") (get metadata "md5") normalized-path
  725. encryptedFname (get metadata "mtime") false nil nil)))))))
  726. (deftype RSAPI [^:mutable graph-uuid' ^:mutable private-key' ^:mutable public-key']
  727. IToken
  728. (<get-token [_this]
  729. (user/<wrap-ensure-id&access-token
  730. (state/get-auth-id-token)))
  731. IRSAPI
  732. (rsapi-ready? [_ graph-uuid] (and (= graph-uuid graph-uuid') private-key' public-key'))
  733. (<key-gen [_] (go (js->clj (<! (p->c (ipc/ipc "key-gen")))
  734. :keywordize-keys true)))
  735. (<set-env [_ graph-uuid prod? private-key public-key]
  736. (when (not-empty private-key)
  737. (print (util/format "[%s] setting sync age-encryption passphrase..." graph-uuid)))
  738. (set! graph-uuid' graph-uuid)
  739. (set! private-key' private-key)
  740. (set! public-key' public-key)
  741. (p->c (ipc/ipc "set-env" graph-uuid (if prod? "prod" "dev") private-key public-key)))
  742. (<get-local-all-files-meta [this graph-uuid base-path]
  743. (go
  744. (let [r (<! (<retry-rsapi #(p->c (ipc/ipc "get-local-all-files-meta" graph-uuid base-path))))]
  745. (if (instance? ExceptionInfo r)
  746. r
  747. (<! (<build-local-file-metadatas this graph-uuid r))))))
  748. (<get-local-files-meta [this graph-uuid base-path filepaths]
  749. (go
  750. (let [r (<! (<retry-rsapi #(p->c (ipc/ipc "get-local-files-meta" graph-uuid base-path filepaths))))]
  751. (assert (not (instance? ExceptionInfo r)) "get-local-files-meta shouldn't return exception")
  752. (<! (<build-local-file-metadatas this graph-uuid r)))))
  753. (<rename-local-file [_ graph-uuid base-path from to]
  754. (<retry-rsapi #(p->c (ipc/ipc "rename-local-file" graph-uuid base-path
  755. (path-normalize from)
  756. (path-normalize to)))))
  757. (<update-local-files [this graph-uuid base-path filepaths]
  758. (println "update-local-files" graph-uuid base-path filepaths)
  759. (go
  760. (<! (<rsapi-cancel-all-requests))
  761. (let [token (<! (<get-token this))]
  762. (<! (p->c (ipc/ipc "update-local-files" graph-uuid base-path filepaths token))))))
  763. (<fetch-remote-files [this graph-uuid base-path filepaths]
  764. (go
  765. (<! (<rsapi-cancel-all-requests))
  766. (let [token (<! (<get-token this))]
  767. (<! (p->c (ipc/ipc "fetch-remote-files" graph-uuid base-path filepaths token))))))
  768. (<download-version-files [this graph-uuid base-path filepaths]
  769. (go
  770. (let [token (<! (<get-token this))
  771. r (<! (<retry-rsapi
  772. #(p->c (ipc/ipc "download-version-files" graph-uuid base-path filepaths token))))]
  773. r)))
  774. (<delete-local-files [_ graph-uuid base-path filepaths]
  775. (let [normalized-filepaths (mapv path-normalize filepaths)]
  776. (go
  777. (println "delete-local-files" filepaths)
  778. (let [r (<! (<retry-rsapi #(p->c (ipc/ipc "delete-local-files" graph-uuid base-path normalized-filepaths))))]
  779. r))))
  780. (<update-remote-files [this graph-uuid base-path filepaths local-txid]
  781. (let [normalized-filepaths (mapv path-normalize filepaths)]
  782. (go
  783. (<! (<rsapi-cancel-all-requests))
  784. (let [token (<! (<get-token this))]
  785. (<! (<retry-rsapi
  786. #(p->c (ipc/ipc "update-remote-files" graph-uuid base-path normalized-filepaths local-txid token))))))))
  787. (<delete-remote-files [this graph-uuid base-path filepaths local-txid]
  788. (let [normalized-filepaths (mapv path-normalize filepaths)]
  789. (go
  790. (let [token (<! (<get-token this))]
  791. (<!
  792. (<retry-rsapi
  793. #(p->c (ipc/ipc "delete-remote-files" graph-uuid base-path normalized-filepaths local-txid token))))))))
  794. (<encrypt-fnames [_ graph-uuid fnames] (go (js->clj (<! (p->c (ipc/ipc "encrypt-fnames" graph-uuid fnames))))))
  795. (<decrypt-fnames [_ graph-uuid fnames] (go
  796. (let [r (<! (p->c (ipc/ipc "decrypt-fnames" graph-uuid fnames)))]
  797. (if (instance? ExceptionInfo r)
  798. (ex-info "decrypt-failed" {:fnames fnames} (ex-cause r))
  799. (js->clj r)))))
  800. (<cancel-all-requests [_]
  801. (p->c (ipc/ipc "cancel-all-requests")))
  802. (<add-new-version [_this repo path content]
  803. (p->c (ipc/ipc "addVersionFile" (config/get-local-dir repo) path content))))
  804. (deftype ^:large-vars/cleanup-todo CapacitorAPI [^:mutable graph-uuid' ^:mutable private-key ^:mutable public-key']
  805. IToken
  806. (<get-token [_this]
  807. (user/<wrap-ensure-id&access-token
  808. (state/get-auth-id-token)))
  809. IRSAPI
  810. (rsapi-ready? [_ graph-uuid] (and (= graph-uuid graph-uuid') private-key public-key'))
  811. (<key-gen [_]
  812. (go (let [r (<! (p->c (.keygen mobile-util/file-sync #js {})))]
  813. (-> r
  814. (js->clj :keywordize-keys true)))))
  815. (<set-env [_ graph-uuid prod? secret-key public-key]
  816. (set! graph-uuid' graph-uuid)
  817. (set! private-key secret-key)
  818. (set! public-key' public-key)
  819. (p->c (.setEnv mobile-util/file-sync (clj->js {:graphUUID graph-uuid
  820. :env (if prod? "prod" "dev")
  821. :secretKey secret-key
  822. :publicKey public-key}))))
  823. (<get-local-all-files-meta [this graph-uuid base-path]
  824. (go
  825. (let [r (<! (p->c (.getLocalAllFilesMeta mobile-util/file-sync (clj->js {:graphUUID graph-uuid
  826. :basePath base-path}))))]
  827. (if (instance? ExceptionInfo r)
  828. r
  829. (<! (<build-local-file-metadatas this graph-uuid (.-result r)))))))
  830. (<get-local-files-meta [this graph-uuid base-path filepaths]
  831. (go
  832. (let [r (<! (p->c (.getLocalFilesMeta mobile-util/file-sync
  833. (clj->js {:graphUUID graph-uuid
  834. :basePath base-path
  835. :filePaths filepaths}))))]
  836. (assert (not (instance? ExceptionInfo r)) "get-local-files-meta shouldn't return exception")
  837. (<! (<build-local-file-metadatas this graph-uuid (.-result r))))))
  838. (<rename-local-file [_ graph-uuid base-path from to]
  839. (p->c (.renameLocalFile mobile-util/file-sync
  840. (clj->js {:graphUUID graph-uuid
  841. :basePath base-path
  842. :from (path-normalize from)
  843. :to (path-normalize to)}))))
  844. (<update-local-files [this graph-uuid base-path filepaths]
  845. (go
  846. (let [token (<! (<get-token this))
  847. filepaths' (map path-normalize filepaths)]
  848. (<! (p->c (.updateLocalFiles mobile-util/file-sync (clj->js {:graphUUID graph-uuid
  849. :basePath base-path
  850. :filePaths filepaths'
  851. :token token})))))))
  852. (<fetch-remote-files [this graph-uuid base-path filepaths]
  853. (go
  854. (let [token (<! (<get-token this))
  855. r (<! (<retry-rsapi
  856. #(p->c (.fetchRemoteFiles mobile-util/file-sync
  857. (clj->js {:graphUUID graph-uuid
  858. :basePath base-path
  859. :filePaths filepaths
  860. :token token})))))]
  861. (js->clj (.-value r)))))
  862. (<download-version-files [this graph-uuid base-path filepaths]
  863. (go
  864. (let [token (<! (<get-token this))
  865. r (<! (<retry-rsapi
  866. #(p->c (.updateLocalVersionFiles mobile-util/file-sync
  867. (clj->js {:graphUUID graph-uuid
  868. :basePath base-path
  869. :filePaths filepaths
  870. :token token})))))]
  871. r)))
  872. (<delete-local-files [_ graph-uuid base-path filepaths]
  873. (let [normalized-filepaths (mapv path-normalize filepaths)]
  874. (go
  875. (let [r (<! (<retry-rsapi #(p->c (.deleteLocalFiles mobile-util/file-sync
  876. (clj->js {:graphUUID graph-uuid
  877. :basePath base-path
  878. :filePaths normalized-filepaths})))))]
  879. r))))
  880. (<update-remote-files [this graph-uuid base-path filepaths local-txid]
  881. (let [normalized-filepaths (mapv path-normalize filepaths)]
  882. (go
  883. (let [token (<! (<get-token this))
  884. r (<! (p->c (.updateRemoteFiles mobile-util/file-sync
  885. (clj->js {:graphUUID graph-uuid
  886. :basePath base-path
  887. :filePaths normalized-filepaths
  888. :txid local-txid
  889. :token token
  890. :fnameEncryption true}))))]
  891. (if (instance? ExceptionInfo r)
  892. r
  893. (get (js->clj r) "txid"))))))
  894. (<delete-remote-files [this graph-uuid base-path filepaths local-txid]
  895. (let [normalized-filepaths (mapv path-normalize filepaths)]
  896. (go
  897. (let [token (<! (<get-token this))
  898. r (<! (p->c (.deleteRemoteFiles mobile-util/file-sync
  899. (clj->js {:graphUUID graph-uuid
  900. :basePath base-path
  901. :filePaths normalized-filepaths
  902. :txid local-txid
  903. :token token}))))]
  904. (if (instance? ExceptionInfo r)
  905. r
  906. (get (js->clj r) "txid"))))))
  907. (<encrypt-fnames [_ graph-uuid fnames]
  908. (go
  909. (let [r (<! (p->c (.encryptFnames mobile-util/file-sync
  910. (clj->js {:graphUUID graph-uuid
  911. :filePaths fnames}))))]
  912. (if (instance? ExceptionInfo r)
  913. (.-cause r)
  914. (get (js->clj r) "value")))))
  915. (<decrypt-fnames [_ graph-uuid fnames]
  916. (go (let [r (<! (p->c (.decryptFnames mobile-util/file-sync
  917. (clj->js {:graphUUID graph-uuid
  918. :filePaths fnames}))))]
  919. (if (instance? ExceptionInfo r)
  920. (ex-info "decrypt-failed" {:fnames fnames} (ex-cause r))
  921. (get (js->clj r) "value")))))
  922. (<cancel-all-requests [_]
  923. (p->c (.cancelAllRequests mobile-util/file-sync)))
  924. (<add-new-version [_this repo path content]
  925. (p->c (capacitor-fs/backup-file repo :version-file-dir path content))))
  926. (def rsapi (cond
  927. (util/electron?)
  928. (->RSAPI nil nil nil)
  929. (mobile-util/native-ios?)
  930. (->CapacitorAPI nil nil nil)
  931. (mobile-util/native-android?)
  932. (->CapacitorAPI nil nil nil)
  933. :else
  934. nil))
  935. (defn add-new-version-file
  936. [repo path content]
  937. (<add-new-version rsapi repo path content))
  938. (defn <rsapi-cancel-all-requests []
  939. (go
  940. (when rsapi
  941. (<! (<cancel-all-requests rsapi)))))
  942. ;;; ### remote & rs api exceptions
  943. (defn sync-stop-when-api-flying?
  944. [exp]
  945. (some-> (ex-data exp) :err (= :stop)))
  946. (defn storage-exceed-limit?
  947. [exp]
  948. (some->> (ex-data exp)
  949. :err
  950. ((juxt :status (comp :message :body)))
  951. ((fn [[status msg]] (and (= 403 status) (= msg "storage-limit"))))))
  952. (defn graph-count-exceed-limit?
  953. [exp]
  954. (some->> (ex-data exp)
  955. :err
  956. ((juxt :status (comp :message :body)))
  957. ((fn [[status msg]] (and (= 403 status) (= msg "graph-count-exceed-limit"))))))
  958. (defn decrypt-exp?
  959. [exp]
  960. (some-> exp ex-message #(= % "decrypt-failed")))
  961. ;;; remote api exceptions ends
  962. ;;; ### sync events
  963. (defn- put-sync-event!
  964. [val]
  965. (async/put! pubsub/sync-events-ch val))
  966. (def ^:private debug-print-sync-events-loop-stop-chan (chan 1))
  967. (defn debug-print-sync-events-loop
  968. ([] (debug-print-sync-events-loop [:created-local-version-file
  969. :finished-local->remote
  970. :finished-remote->local
  971. :pause
  972. :resume
  973. :exception-decrypt-failed
  974. :remote->local-full-sync-failed
  975. :local->remote-full-sync-failed]))
  976. ([topics]
  977. (util/drain-chan debug-print-sync-events-loop-stop-chan)
  978. (let [topic&chs (map (juxt identity #(chan 10)) topics)
  979. out-ch (chan 10)
  980. out-mix (async/mix out-ch)]
  981. (doseq [[topic ch] topic&chs]
  982. (async/sub pubsub/sync-events-pub topic ch)
  983. (async/admix out-mix ch))
  984. (go-loop []
  985. (let [{:keys [val stop]}
  986. (async/alt!
  987. debug-print-sync-events-loop-stop-chan {:stop true}
  988. out-ch ([v] {:val v}))]
  989. (cond
  990. stop (do (async/unmix-all out-mix)
  991. (doseq [[topic ch] topic&chs]
  992. (async/unsub pubsub/sync-events-pub topic ch)))
  993. val (do (pp/pprint [:debug :sync-event val])
  994. (recur))))))))
  995. (defn stop-debug-print-sync-events-loop
  996. []
  997. (offer! debug-print-sync-events-loop-stop-chan true))
  998. ;;; sync events ends
  999. (defn- fire-file-sync-storage-exceed-limit-event!
  1000. [exp]
  1001. (when (storage-exceed-limit? exp)
  1002. (state/pub-event! [:file-sync/storage-exceed-limit])
  1003. true))
  1004. (defn- fire-file-sync-graph-count-exceed-limit-event!
  1005. [exp]
  1006. (when (graph-count-exceed-limit? exp)
  1007. (state/pub-event! [:file-sync/graph-count-exceed-limit])
  1008. true))
  1009. (deftype RemoteAPI [*stopped?]
  1010. Object
  1011. (<request [this api-name body]
  1012. (go
  1013. (let [resp (<! (<request api-name body (<! (<get-token this)) *stopped?))]
  1014. (if (http/unexceptional-status? (:status resp))
  1015. (get-resp-json-body resp)
  1016. (let [exp (ex-info "request failed"
  1017. {:err resp
  1018. :body (:body resp)
  1019. :api-name api-name
  1020. :request-body body})]
  1021. (fire-file-sync-storage-exceed-limit-event! exp)
  1022. (fire-file-sync-graph-count-exceed-limit-event! exp)
  1023. exp)))))
  1024. ;; for test
  1025. (update-files [this graph-uuid txid files]
  1026. {:pre [(map? files)
  1027. (number? txid)]}
  1028. (.<request this "update_files" {:GraphUUID graph-uuid :TXId txid :Files files}))
  1029. IToken
  1030. (<get-token [_this]
  1031. (user/<wrap-ensure-id&access-token
  1032. (state/get-auth-id-token))))
  1033. (defn- filter-files-with-unnormalized-path
  1034. [file-meta-list encrypted-path->path-map]
  1035. (let [path->encrypted-path-map (set/map-invert encrypted-path->path-map)
  1036. raw-paths (vals encrypted-path->path-map)
  1037. *encrypted-paths-to-drop (transient [])]
  1038. (loop [[raw-path & other-paths] raw-paths]
  1039. (when raw-path
  1040. (let [normalized-path (path-normalize raw-path)]
  1041. (when (not= normalized-path raw-path)
  1042. (println :filter-files-with-unnormalized-path raw-path)
  1043. (conj! *encrypted-paths-to-drop (get path->encrypted-path-map raw-path))))
  1044. (recur other-paths)))
  1045. (let [encrypted-paths-to-drop (set (persistent! *encrypted-paths-to-drop))]
  1046. (filterv #(not (contains? encrypted-paths-to-drop (:encrypted-path %))) file-meta-list))))
  1047. (defn- filter-case-different-same-files
  1048. "filter case-different-but-same-name files, last-modified one wins"
  1049. [file-meta-list encrypted-path->path-map]
  1050. (let [seen (volatile! {})]
  1051. (loop [result-file-meta-list (transient {})
  1052. [f & others] file-meta-list]
  1053. (if f
  1054. (let [origin-path (get encrypted-path->path-map (:encrypted-path f))
  1055. _ (assert (some? origin-path) f)
  1056. path (string/lower-case origin-path)
  1057. last-modified (:last-modified f)
  1058. last-modified-seen (get @seen path)]
  1059. (cond
  1060. (or (and path (nil? last-modified-seen))
  1061. (and path (some? last-modified-seen) (> last-modified last-modified-seen)))
  1062. ;; 1. not found in seen
  1063. ;; 2. found in seen, but current f wins
  1064. (do (vswap! seen conj [path last-modified])
  1065. (recur (conj! result-file-meta-list [path f]) others))
  1066. (and path (some? last-modified-seen) (<= last-modified last-modified-seen))
  1067. ;; found in seen, and seen-f has more recent last-modified epoch
  1068. (recur result-file-meta-list others)
  1069. :else
  1070. (do (println :debug-filter-case-different-same-files:unreachable f path)
  1071. (recur result-file-meta-list others))))
  1072. (vals (persistent! result-file-meta-list))))))
  1073. (extend-type RemoteAPI
  1074. IRemoteAPI
  1075. (<user-info [this]
  1076. (user/<wrap-ensure-id&access-token
  1077. (<! (.<request this "user_info" {}))))
  1078. (<get-remote-all-files-meta [this graph-uuid]
  1079. (user/<wrap-ensure-id&access-token
  1080. (let [file-meta-list (transient #{})
  1081. encrypted-path-list (transient [])
  1082. exp-r
  1083. (<!
  1084. (go-loop [continuation-token nil]
  1085. (let [r (<! (.<request this "get_all_files"
  1086. (into
  1087. {}
  1088. (remove (comp nil? second)
  1089. {:GraphUUID graph-uuid :ContinuationToken continuation-token}))))]
  1090. (if (instance? ExceptionInfo r)
  1091. r
  1092. (let [next-continuation-token (:NextContinuationToken r)
  1093. objs (:Objects r)]
  1094. (apply conj! encrypted-path-list (map (comp remove-user-graph-uuid-prefix :Key) objs))
  1095. (apply conj! file-meta-list
  1096. (map
  1097. #(hash-map :checksum (:checksum %)
  1098. :encrypted-path (remove-user-graph-uuid-prefix (:Key %))
  1099. :size (:Size %)
  1100. :last-modified (:LastModified %)
  1101. :txid (:Txid %))
  1102. objs))
  1103. (when-not (empty? next-continuation-token)
  1104. (recur next-continuation-token)))))))]
  1105. (if (instance? ExceptionInfo exp-r)
  1106. exp-r
  1107. (let [file-meta-list* (persistent! file-meta-list)
  1108. encrypted-path-list* (persistent! encrypted-path-list)
  1109. path-list-or-exp (<! (<decrypt-fnames rsapi graph-uuid encrypted-path-list*))]
  1110. (if (instance? ExceptionInfo path-list-or-exp)
  1111. path-list-or-exp
  1112. (let [encrypted-path->path-map (zipmap encrypted-path-list* path-list-or-exp)]
  1113. (set
  1114. (mapv
  1115. #(->FileMetadata (:size %)
  1116. (:checksum %)
  1117. (get encrypted-path->path-map (:encrypted-path %))
  1118. (:encrypted-path %)
  1119. (:last-modified %)
  1120. true
  1121. (:txid %)
  1122. nil)
  1123. (-> file-meta-list*
  1124. (filter-files-with-unnormalized-path encrypted-path->path-map)
  1125. (filter-case-different-same-files encrypted-path->path-map)))))))))))
  1126. (<get-remote-files-meta [this graph-uuid filepaths]
  1127. {:pre [(coll? filepaths)]}
  1128. (user/<wrap-ensure-id&access-token
  1129. (let [encrypted-paths* (<! (<encrypt-fnames rsapi graph-uuid filepaths))
  1130. r (<! (.<request this "get_files_meta" {:GraphUUID graph-uuid :Files encrypted-paths*}))]
  1131. (if (instance? ExceptionInfo r)
  1132. r
  1133. (let [encrypted-paths (mapv :FilePath r)
  1134. paths-or-exp (<! (<decrypt-fnames rsapi graph-uuid encrypted-paths))]
  1135. (if (instance? ExceptionInfo paths-or-exp)
  1136. paths-or-exp
  1137. (let [encrypted-path->path-map (zipmap encrypted-paths paths-or-exp)]
  1138. (into #{}
  1139. (comp
  1140. (filter #(not= "filepath too long" (:Error %)))
  1141. (map #(->FileMetadata (:Size %)
  1142. (:Checksum %)
  1143. (some->> (get encrypted-path->path-map (:FilePath %))
  1144. path-normalize)
  1145. (:FilePath %)
  1146. (:LastModified %)
  1147. true
  1148. (:Txid %)
  1149. nil)))
  1150. r))))))))
  1151. (<get-remote-graph [this graph-name-opt graph-uuid-opt]
  1152. {:pre [(or graph-name-opt graph-uuid-opt)]}
  1153. (user/<wrap-ensure-id&access-token
  1154. (<! (.<request this "get_graph" (cond-> {}
  1155. (seq graph-name-opt)
  1156. (assoc :GraphName graph-name-opt)
  1157. (seq graph-uuid-opt)
  1158. (assoc :GraphUUID graph-uuid-opt))))))
  1159. (<get-remote-txid [this graph-uuid]
  1160. (user/<wrap-ensure-id&access-token
  1161. (<! (.<request this "get_txid" {:GraphUUID graph-uuid}))))
  1162. (<get-remote-file-versions [this graph-uuid filepath]
  1163. (user/<wrap-ensure-id&access-token
  1164. (let [encrypted-path (first (<! (<encrypt-fnames rsapi graph-uuid [filepath])))]
  1165. (<! (.<request this "get_file_version_list" {:GraphUUID graph-uuid :File encrypted-path})))))
  1166. (<list-remote-graphs [this]
  1167. (user/<wrap-ensure-id&access-token
  1168. (<! (.<request this "list_graphs"))))
  1169. (<get-deletion-logs [this graph-uuid from-txid]
  1170. (user/<wrap-ensure-id&access-token
  1171. (let [r (<! (.<request this "get_deletion_log_v20221212" {:GraphUUID graph-uuid :FromTXId from-txid}))]
  1172. (if (instance? ExceptionInfo r)
  1173. r
  1174. (let [txns-with-encrypted-paths (mapv (fn [txn]
  1175. (assoc txn :paths
  1176. (mapv remove-user-graph-uuid-prefix (:paths txn))))
  1177. (:Transactions r))
  1178. encrypted-paths (mapcat :paths txns-with-encrypted-paths)
  1179. encrypted-path->path-map
  1180. (zipmap
  1181. encrypted-paths
  1182. (<! (<decrypt-fnames rsapi graph-uuid encrypted-paths)))
  1183. txns
  1184. (mapv
  1185. (fn [txn]
  1186. (assoc txn :paths (mapv #(get encrypted-path->path-map %) (:paths txn))))
  1187. txns-with-encrypted-paths)]
  1188. txns)))))
  1189. (<get-diff [this graph-uuid from-txid]
  1190. ;; TODO: path in transactions should be relative path(now s3 key, which includes graph-uuid and user-uuid)
  1191. (user/<wrap-ensure-id&access-token
  1192. (let [r (<! (.<request this "get_diff" {:GraphUUID graph-uuid :FromTXId from-txid}))]
  1193. (if (instance? ExceptionInfo r)
  1194. r
  1195. (let [txns-with-encrypted-paths (sort-by :TXId (:Transactions r))
  1196. txns-with-encrypted-paths*
  1197. (mapv
  1198. (fn [txn]
  1199. (assoc txn :TXContent
  1200. (mapv
  1201. (fn [[to-path from-path checksum]]
  1202. [(remove-user-graph-uuid-prefix to-path)
  1203. (some-> from-path remove-user-graph-uuid-prefix)
  1204. checksum])
  1205. (:TXContent txn))))
  1206. txns-with-encrypted-paths)
  1207. encrypted-paths
  1208. (mapcat
  1209. (fn [txn]
  1210. (remove
  1211. #(or (nil? %) (not (string/starts-with? % "e.")))
  1212. (mapcat
  1213. (fn [[to-path from-path _checksum]] [to-path from-path])
  1214. (:TXContent txn))))
  1215. txns-with-encrypted-paths*)
  1216. encrypted-path->path-map
  1217. (zipmap
  1218. encrypted-paths
  1219. (<! (<decrypt-fnames rsapi graph-uuid encrypted-paths)))
  1220. txns
  1221. (mapv
  1222. (fn [txn]
  1223. (assoc
  1224. txn :TXContent
  1225. (mapv
  1226. (fn [[to-path from-path checksum]]
  1227. [(get encrypted-path->path-map to-path to-path)
  1228. (some->> from-path (get encrypted-path->path-map))
  1229. checksum])
  1230. (:TXContent txn))))
  1231. txns-with-encrypted-paths*)]
  1232. [txns
  1233. (:TXId (last txns))
  1234. (:TXId (first txns))])))))
  1235. (<create-graph [this graph-name]
  1236. (user/<wrap-ensure-id&access-token
  1237. (<! (.<request this "create_graph" {:GraphName graph-name}))))
  1238. (<delete-graph [this graph-uuid]
  1239. (user/<wrap-ensure-id&access-token
  1240. (<! (.<request this "delete_graph" {:GraphUUID graph-uuid}))))
  1241. (<get-graph-salt [this graph-uuid]
  1242. (user/<wrap-ensure-id&access-token
  1243. (<! (.<request this "get_graph_salt" {:GraphUUID graph-uuid}))))
  1244. (<create-graph-salt [this graph-uuid]
  1245. (user/<wrap-ensure-id&access-token
  1246. (<! (.<request this "create_graph_salt" {:GraphUUID graph-uuid}))))
  1247. (<get-graph-encrypt-keys [this graph-uuid]
  1248. (user/<wrap-ensure-id&access-token
  1249. (<! (.<request this "get_graph_encrypt_keys" {:GraphUUID graph-uuid}))))
  1250. (<upload-graph-encrypt-keys [this graph-uuid public-key encrypted-private-key]
  1251. (user/<wrap-ensure-id&access-token
  1252. (<! (.<request this "upload_graph_encrypt_keys" {:GraphUUID graph-uuid
  1253. :public-key public-key
  1254. :encrypted-private-key encrypted-private-key})))))
  1255. (extend-type RemoteAPI
  1256. IRemoteControlAPI
  1257. (<delete-remote-files-control [this graph-uuid filepaths]
  1258. (user/<wrap-ensure-id&access-token
  1259. (let [partitioned-files (partition-all 20 (<! (<encrypt-fnames rsapi graph-uuid filepaths)))]
  1260. (loop [[files & others] partitioned-files]
  1261. (when files
  1262. (let [current-txid (:TXId (<! (<get-remote-txid this graph-uuid)))]
  1263. (<! (.<request this "delete_files" {:GraphUUID graph-uuid :TXId current-txid :Files files}))
  1264. (recur others))))))))
  1265. (comment
  1266. (declare remoteapi)
  1267. (<delete-remote-files-control remoteapi (second @graphs-txid) ["pages/aa.md"])
  1268. )
  1269. (def remoteapi (->RemoteAPI nil))
  1270. (def ^:private *get-graph-salt-memoize-cache (atom {}))
  1271. (defn update-graph-salt-cache [graph-uuid v]
  1272. {:pre [(map? v)
  1273. (= #{:value :expired-at} (set (keys v)))]}
  1274. (swap! *get-graph-salt-memoize-cache conj [graph-uuid v]))
  1275. (defn <get-graph-salt-memoize [remoteapi graph-uuid]
  1276. (go
  1277. (let [r (get @*get-graph-salt-memoize-cache graph-uuid)
  1278. expired-at (:expired-at r)
  1279. now (tc/to-long (t/now))]
  1280. (if (< now expired-at)
  1281. r
  1282. (let [r (<! (<get-graph-salt remoteapi graph-uuid))]
  1283. (swap! *get-graph-salt-memoize-cache conj [graph-uuid r])
  1284. r)))))
  1285. (def ^:private *get-graph-encrypt-keys-memoize-cache (atom {}))
  1286. (defn update-graph-encrypt-keys-cache [graph-uuid v]
  1287. {:pre [(map? v)
  1288. (= #{:public-key :encrypted-private-key} (set (keys v)))]}
  1289. (swap! *get-graph-encrypt-keys-memoize-cache conj [graph-uuid v]))
  1290. (defn <get-graph-encrypt-keys-memoize [remoteapi graph-uuid]
  1291. (go
  1292. (or (get @*get-graph-encrypt-keys-memoize-cache graph-uuid)
  1293. (let [{:keys [public-key encrypted-private-key] :as r}
  1294. (<! (<get-graph-encrypt-keys remoteapi graph-uuid))]
  1295. (when (and public-key encrypted-private-key)
  1296. (swap! *get-graph-encrypt-keys-memoize-cache conj [graph-uuid r]))
  1297. r))))
  1298. (defn- is-journals-or-pages?
  1299. [filetxn]
  1300. (let [rel-path (relative-path filetxn)]
  1301. (or (string/starts-with? rel-path (node-path/join (config/get-journals-directory) node-path/sep))
  1302. (string/starts-with? rel-path (node-path/join (config/get-pages-directory) node-path/sep)))))
  1303. (defn- need-add-version-file?
  1304. "when we need to create a new version file:
  1305. 1. when apply a 'update' filetxn, it already exists(same page name) locally and has delete diffs
  1306. 2. when apply a 'delete' filetxn, its origin remote content and local content are different
  1307. - TODO: we need to store origin remote content md5 in server db
  1308. 3. create version files only for files under 'journals/', 'pages/' dir"
  1309. [^FileTxn filetxn origin-db-content]
  1310. (go
  1311. (cond
  1312. (.renamed? filetxn)
  1313. false
  1314. (.-deleted? filetxn)
  1315. false
  1316. (.-updated? filetxn)
  1317. (let [rpath (relative-path filetxn)
  1318. repo (state/get-current-repo)
  1319. repo-dir (config/get-repo-dir repo)
  1320. content (<! (p->c (-> (fs/file-exists? repo-dir rpath)
  1321. (p/then (fn [exists?]
  1322. (when exists?
  1323. (fs/read-file repo-dir rpath)))))))]
  1324. (and (seq origin-db-content)
  1325. (or (nil? content)
  1326. (some :removed (diff/diff origin-db-content content))))))))
  1327. (defn- <with-pause
  1328. [ch *paused]
  1329. (go-loop []
  1330. (if @*paused
  1331. {:pause true}
  1332. (let [{:keys [timeout val]}
  1333. (async/alt! ch ([v] {:val v})
  1334. (timeout 1000) {:timeout true})]
  1335. (cond
  1336. val val
  1337. timeout (recur))))))
  1338. (defn- assert-local-txid<=remote-txid
  1339. []
  1340. (when-let [local-txid (last @graphs-txid)]
  1341. (go (let [remote-txid (:TXId (<! (<get-remote-txid remoteapi (second @graphs-txid))))]
  1342. (assert (<= local-txid remote-txid)
  1343. [@graphs-txid local-txid remote-txid])))))
  1344. (defn- get-local-files-checksum
  1345. [graph-uuid base-path relative-paths]
  1346. (go
  1347. (into {}
  1348. (map (juxt #(.-path ^FileMetadata %) #(.-etag ^FileMetadata %)))
  1349. (<! (<get-local-files-meta rsapi graph-uuid base-path relative-paths)))))
  1350. (declare sync-state--add-current-local->remote-files
  1351. sync-state--add-current-remote->local-files
  1352. sync-state--remove-current-local->remote-files
  1353. sync-state--remove-current-remote->local-files
  1354. sync-state--add-recent-remote->local-files
  1355. sync-state--remove-recent-remote->local-files
  1356. sync-state--stopped?)
  1357. (defn- filetxns=>recent-remote->local-files
  1358. [filetxns]
  1359. (let [{:keys [update-filetxns delete-filetxns rename-filetxns]}
  1360. (group-by (fn [^FileTxn e]
  1361. (cond
  1362. (.-updated? e) :update-filetxns
  1363. (.-deleted? e) :delete-filetxns
  1364. (.renamed? e) :rename-filetxns)) filetxns)
  1365. update-file-items (map
  1366. (fn [filetxn]
  1367. (let [path (relative-path filetxn)]
  1368. {:remote->local-type :update
  1369. :checksum (-checksum filetxn)
  1370. :path path}))
  1371. update-filetxns)
  1372. rename-file-items (mapcat
  1373. (fn [^FileTxn filetxn]
  1374. (let [to-path (relative-path filetxn)
  1375. from-path (.-from-path filetxn)]
  1376. [{:remote->local-type :update
  1377. :checksum (-checksum filetxn)
  1378. :path to-path}
  1379. {:remote->local-type :delete
  1380. :checksum nil
  1381. :path from-path}]))
  1382. rename-filetxns)
  1383. delete-file-items (map
  1384. (fn [filetxn]
  1385. (let [path (relative-path filetxn)]
  1386. {:remote->local-type :delete
  1387. :checksum (-checksum filetxn)
  1388. :path path}))
  1389. delete-filetxns)]
  1390. (set (concat update-file-items rename-file-items delete-file-items))))
  1391. (defn- <apply-remote-deletion
  1392. "Apply remote deletion, if the file is not deleted locally, delete it locally.
  1393. if the file is changed locally, leave the changed part.
  1394. To replace <delete-local-files"
  1395. [graph-uuid base-path relative-paths]
  1396. (go
  1397. (p->c (p/all (->> relative-paths
  1398. (map (fn [rpath]
  1399. (prn ::handle-remote-deletion rpath)
  1400. (p/let [base-file (path/path-join "logseq/version-files/base" rpath)
  1401. current-change-file rpath
  1402. format (gp-util/get-format current-change-file)
  1403. repo (state/get-current-repo)
  1404. repo-dir (config/get-repo-dir repo)
  1405. base-exists? (fs/file-exists? repo-dir base-file)]
  1406. (if base-exists?
  1407. (p/let [base-content (fs/read-file repo-dir base-file)
  1408. current-content (-> (fs/read-file repo-dir current-change-file)
  1409. (p/catch (fn [_] nil)))]
  1410. (if (= base-content current-content)
  1411. ;; base-content == current-content, delete current-change-file
  1412. (p/do!
  1413. (<delete-local-files rsapi graph-uuid base-path [rpath])
  1414. (fs/unlink! repo (path/path-join repo-dir base-file) {}))
  1415. ;; base-content != current-content, merge, do not delete
  1416. (p/let [merged-content (diff-merge/three-way-merge base-content "" current-content format)]
  1417. (fs/write-file! repo repo-dir current-change-file merged-content {:skip-compare? true})
  1418. (file-handler/alter-file repo current-change-file merged-content {:re-render-root? true
  1419. :from-disk? true
  1420. :fs/event :fs/remote-file-change}))))
  1421. ;; no base-version, use legacy approach, delete it
  1422. (<delete-local-files rsapi graph-uuid base-path [rpath]))))))))))
  1423. (defn- <fetch-remote-and-update-local-files
  1424. [graph-uuid base-path relative-paths]
  1425. (go
  1426. (let [fetched-file-rpaths (<! (<fetch-remote-files rsapi graph-uuid base-path relative-paths))]
  1427. (p->c (p/all (->> fetched-file-rpaths
  1428. (map (fn [rpath]
  1429. (p/let [incoming-file (path/path-join "logseq/version-files/incoming" rpath)
  1430. base-file (path/path-join "logseq/version-files/base" rpath)
  1431. current-change-file rpath
  1432. format (gp-util/get-format current-change-file)
  1433. repo (state/get-current-repo)
  1434. repo-dir (config/get-repo-dir repo)
  1435. base-exists? (fs/file-exists? repo-dir base-file)
  1436. _ (prn ::base-ex base-exists?)]
  1437. (cond
  1438. base-exists?
  1439. (p/let [base-content (fs/read-file repo-dir base-file)
  1440. current-content (-> (fs/read-file repo-dir current-change-file)
  1441. (p/catch (fn [_] nil)))]
  1442. (if (= base-content current-content)
  1443. (do
  1444. (prn "base=current, write directly")
  1445. (p/do!
  1446. (fs/copy! repo
  1447. (path/path-join repo-dir incoming-file)
  1448. (path/path-join repo-dir current-change-file))
  1449. (fs/copy! repo
  1450. (path/path-join repo-dir incoming-file)
  1451. (path/path-join repo-dir base-file))))
  1452. (do
  1453. (prn "base!=current, should do a 3-way merge")
  1454. (prn ::cur
  1455. current-content)
  1456. (p/let [current-content (or current-content "")
  1457. incoming-content (fs/read-file repo-dir incoming-file)
  1458. merged-content (diff-merge/three-way-merge base-content incoming-content current-content format)]
  1459. (prn ::merged-content merged-content)
  1460. (when (seq merged-content)
  1461. (p/do!
  1462. (fs/write-file! repo repo-dir current-change-file merged-content {:skip-compare? true})
  1463. (file-handler/alter-file repo current-change-file merged-content {:re-render-root? true
  1464. :from-disk? true
  1465. :fs/event :fs/remote-file-change})))
  1466. ;; now, let fs watcher handle the rest uploading
  1467. (comment fs/copy! repo-url
  1468. (path/path-join repo-dir incoming-file)
  1469. (path/path-join repo-dir current-change-file))))))
  1470. :else
  1471. (do
  1472. (prn "no base, use empty content as base, avoid loosing data")
  1473. (p/let [current-content (-> (fs/read-file repo-dir current-change-file)
  1474. (p/catch (fn [_] nil)))
  1475. current-content (or current-content "")
  1476. incoming-content (fs/read-file repo-dir incoming-file)
  1477. merged-content (diff-merge/three-way-merge "" current-content incoming-content format)]
  1478. (if (= incoming-content merged-content)
  1479. (p/do!
  1480. (fs/copy! repo
  1481. (path/path-join repo-dir incoming-file)
  1482. (path/path-join repo-dir current-change-file))
  1483. (fs/copy! repo
  1484. (path/path-join repo-dir incoming-file)
  1485. (path/path-join repo-dir base-file)))
  1486. ;; else
  1487. (p/do!
  1488. (fs/write-file! repo repo-dir current-change-file merged-content {:skip-compare? true})
  1489. (file-handler/alter-file repo current-change-file merged-content {:re-render-root? true
  1490. :from-disk? true
  1491. :fs/event :fs/remote-file-change})))))))))))))))
  1492. (defn- apply-filetxns
  1493. [*sync-state graph-uuid base-path filetxns *paused]
  1494. (go
  1495. (cond
  1496. (.renamed? (first filetxns))
  1497. (let [^FileTxn filetxn (first filetxns)
  1498. from-path (.-from-path filetxn)
  1499. to-path (.-to-path filetxn)]
  1500. (assert (= 1 (count filetxns)))
  1501. (<! (<rename-local-file rsapi graph-uuid base-path
  1502. (relative-path from-path)
  1503. (relative-path to-path))))
  1504. (.-updated? (first filetxns))
  1505. (let [repo (state/get-current-repo)
  1506. txn->db-content-vec (->> filetxns
  1507. (mapv
  1508. #(when (is-journals-or-pages? %)
  1509. [% (db/get-file repo (relative-path %))]))
  1510. (remove nil?))]
  1511. (doseq [relative-p (map relative-path filetxns)]
  1512. (when-some [relative-p*
  1513. (<! (<case-different-local-file-exist? graph-uuid rsapi base-path relative-p))]
  1514. (let [recent-remote->local-file-item {:remote->local-type :delete
  1515. :checksum nil
  1516. :path relative-p*}]
  1517. (println :debug "found case-different-same-local-file" relative-p relative-p*)
  1518. (swap! *sync-state sync-state--add-recent-remote->local-files
  1519. [recent-remote->local-file-item])
  1520. (<! (<delete-local-files rsapi graph-uuid base-path [relative-p*]))
  1521. (go (<! (timeout 5000))
  1522. (swap! *sync-state sync-state--remove-recent-remote->local-files
  1523. [recent-remote->local-file-item])))))
  1524. (let [update-local-files-ch (<fetch-remote-and-update-local-files graph-uuid base-path (map relative-path filetxns))
  1525. r (<! (<with-pause update-local-files-ch *paused))]
  1526. (doseq [[filetxn origin-db-content] txn->db-content-vec]
  1527. (when (<! (need-add-version-file? filetxn origin-db-content))
  1528. (<! (<add-new-version rsapi repo (relative-path filetxn) origin-db-content))
  1529. (put-sync-event! {:event :created-local-version-file
  1530. :data {:graph-uuid graph-uuid
  1531. :repo repo
  1532. :path (relative-path filetxn)
  1533. :epoch (tc/to-epoch (t/now))}})))
  1534. r))
  1535. (.-deleted? (first filetxns))
  1536. (let [filetxn (first filetxns)]
  1537. (assert (= 1 (count filetxns)))
  1538. (if (<! (<local-file-not-exist? graph-uuid rsapi base-path (relative-path filetxn)))
  1539. ;; not exist, ignore
  1540. true
  1541. (let [r (<! (<apply-remote-deletion graph-uuid base-path [(relative-path filetxn)]))]
  1542. (if (and (instance? ExceptionInfo r)
  1543. (string/index-of (str (ex-cause r)) "No such file or directory"))
  1544. true
  1545. r)))))))
  1546. (declare sync-state-reset-full-remote->local-files)
  1547. (defn apply-filetxns-partitions
  1548. "won't call <update-graphs-txid! when *txid is nil"
  1549. [*sync-state user-uuid graph-uuid base-path filetxns-partitions repo *txid *stopped *paused full-sync?]
  1550. (assert (some? *sync-state))
  1551. (go-loop [filetxns-partitions* filetxns-partitions]
  1552. (cond
  1553. @*stopped {:stop true}
  1554. @*paused {:pause true}
  1555. :else
  1556. (when (seq filetxns-partitions*)
  1557. (let [filetxns (first filetxns-partitions*)
  1558. paths (map relative-path filetxns)
  1559. recent-remote->local-file-items (filetxns=>recent-remote->local-files filetxns)
  1560. _ (when-not full-sync?
  1561. (swap! *sync-state #(sync-state-reset-full-remote->local-files % recent-remote->local-file-items)))
  1562. ;; update recent-remote->local-files
  1563. _ (swap! *sync-state sync-state--add-recent-remote->local-files
  1564. recent-remote->local-file-items)
  1565. _ (swap! *sync-state sync-state--add-current-remote->local-files paths)
  1566. r (<! (apply-filetxns *sync-state graph-uuid base-path filetxns *paused))
  1567. _ (swap! *sync-state sync-state--remove-current-remote->local-files paths
  1568. (not (instance? ExceptionInfo r)))]
  1569. ;; remove these recent-remote->local-file-items 5s later
  1570. (go (<! (timeout 5000))
  1571. (swap! *sync-state sync-state--remove-recent-remote->local-files
  1572. recent-remote->local-file-items))
  1573. (cond
  1574. (instance? ExceptionInfo r) r
  1575. @*paused {:pause true}
  1576. :else
  1577. (let [latest-txid (apply max (and *txid @*txid) (map #(.-txid ^FileTxn %) filetxns))]
  1578. ;; update local-txid
  1579. (when (and *txid (number? latest-txid))
  1580. (reset! *txid latest-txid)
  1581. (<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo)))
  1582. (recur (next filetxns-partitions*)))))))))
  1583. (defmulti need-sync-remote? (fn [v] (cond
  1584. (= :max v)
  1585. :max
  1586. (and (vector? v) (number? (first v)))
  1587. :txid
  1588. (instance? ExceptionInfo v)
  1589. :exceptional-response
  1590. (instance? cljs.core.async.impl.channels/ManyToManyChannel v)
  1591. :chan)))
  1592. (defmethod need-sync-remote? :max [_] true)
  1593. (defmethod need-sync-remote? :txid [[txid remote->local-syncer]]
  1594. (let [remote-txid txid
  1595. local-txid (.-txid remote->local-syncer)]
  1596. (or (nil? local-txid)
  1597. (> remote-txid local-txid))))
  1598. (defmethod need-sync-remote? :exceptional-response [resp]
  1599. (let [data (ex-data resp)
  1600. cause (ex-cause resp)]
  1601. (or
  1602. (and (= (:error data) :promise-error)
  1603. (when-let [r (re-find #"(\d+), txid_to_validate = (\d+)" (str cause))]
  1604. (> (nth r 1) (nth r 2))))
  1605. (= 409 (get-in data [:err :status])))))
  1606. (defmethod need-sync-remote? :chan [c]
  1607. (go (need-sync-remote? (<! c))))
  1608. (defmethod need-sync-remote? :default [_] false)
  1609. (defn- need-reset-local-txid?
  1610. [r]
  1611. (when-let [cause (ex-cause r)]
  1612. (when-let [r (re-find #"(\d+), txid_to_validate = (\d+)" (str cause))]
  1613. (< (nth r 1) (nth r 2)))))
  1614. (defn- graph-has-been-deleted?
  1615. [r]
  1616. (some->> (ex-cause r) str (re-find #"graph-not-exist")))
  1617. (defn- stop-sync-by-rsapi-response?
  1618. [r]
  1619. (some->> (ex-cause r) str (re-find #"Request is not yet valid")))
  1620. ;; type = "change" | "add" | "unlink"
  1621. (deftype FileChangeEvent [type dir path stat checksum]
  1622. IRelativePath
  1623. (-relative-path [_] (remove-dir-prefix dir path))
  1624. IEquiv
  1625. (-equiv [_ ^FileChangeEvent other]
  1626. (and (= dir (.-dir other))
  1627. (= type (.-type other))
  1628. (= path (.-path other))
  1629. (= checksum (.-checksum other))))
  1630. IHash
  1631. (-hash [_]
  1632. (hash {:dir dir
  1633. :type type
  1634. :path path
  1635. :checksum checksum}))
  1636. ILookup
  1637. (-lookup [o k] (-lookup o k nil))
  1638. (-lookup [_ k not-found]
  1639. (case k
  1640. :type type
  1641. :dir dir
  1642. :path path
  1643. :stat stat
  1644. :checksum checksum
  1645. not-found))
  1646. IPrintWithWriter
  1647. (-pr-writer [_ w _opts]
  1648. (write-all w (str {:type type :base-path dir :path path :size (:size stat) :checksum checksum}))))
  1649. (defn- <file-change-event=>recent-remote->local-file-item
  1650. "return nil when related local files not found"
  1651. [graph-uuid ^FileChangeEvent e]
  1652. (go
  1653. (let [tp (case (.-type e)
  1654. ("add" "change") :update
  1655. "unlink" :delete)
  1656. path (relative-path e)]
  1657. (when-let [path-etag-entry (first (<! (get-local-files-checksum graph-uuid (.-dir e) [path])))]
  1658. {:remote->local-type tp
  1659. :checksum (if (= tp :delete) nil
  1660. (val path-etag-entry))
  1661. :path path}))))
  1662. (defn- distinct-file-change-events-xf
  1663. "transducer.
  1664. distinct `FileChangeEvent`s by their path, keep the first one."
  1665. [rf]
  1666. (let [seen (volatile! #{})]
  1667. (fn
  1668. ([] (rf))
  1669. ([result] (rf result))
  1670. ([result ^FileChangeEvent e]
  1671. (if (contains? @seen (.-path e))
  1672. result
  1673. (do (vswap! seen conj (.-path e))
  1674. (rf result e)))))))
  1675. (defn- distinct-file-change-events
  1676. "distinct `FileChangeEvent`s by their path, keep the last one."
  1677. [es]
  1678. (transduce distinct-file-change-events-xf conj '() (reverse es)))
  1679. (defn- partition-file-change-events
  1680. "return transducer.
  1681. partition `FileChangeEvent`s, at most N file-change-events in each partition.
  1682. only one type in a partition."
  1683. [n]
  1684. (comp
  1685. (partition-by (fn [^FileChangeEvent e]
  1686. (case (.-type e)
  1687. ("add" "change") :add-or-change
  1688. "unlink" :unlink)))
  1689. (map #(partition-all n %))
  1690. cat))
  1691. (declare sync-state--valid-to-accept-filewatcher-event?)
  1692. (defonce local-changes-chan (chan (async/dropping-buffer 1000)))
  1693. (defn file-watch-handler
  1694. "file-watcher callback"
  1695. [type {:keys [dir path _content stat] :as _payload}]
  1696. (when-let [current-graph (state/get-current-repo)]
  1697. (when (string/ends-with? current-graph dir)
  1698. (when-let [sync-state (state/get-file-sync-state (state/get-current-file-sync-graph-uuid))]
  1699. (when (sync-state--valid-to-accept-filewatcher-event? sync-state)
  1700. (when (or (:mtime stat) (= type "unlink"))
  1701. (go
  1702. (let [path (path-normalize path)
  1703. files-meta (and (not= "unlink" type)
  1704. (<! (<get-local-files-meta
  1705. rsapi (:current-syncing-graph-uuid sync-state) dir [path])))
  1706. checksum (and (coll? files-meta) (some-> files-meta first :etag))]
  1707. (>! local-changes-chan (->FileChangeEvent type dir path stat checksum))))))))))
  1708. (defn local-changes-revised-chan-builder
  1709. "return chan"
  1710. [local-changes-chan rename-page-event-chan]
  1711. (let [*rename-events (atom #{})
  1712. ch (chan 1000)]
  1713. (go-loop []
  1714. (let [{:keys [rename-event local-change]}
  1715. (async/alt!
  1716. rename-page-event-chan ([v] {:rename-event v}) ;; {:repo X :old-path X :new-path}
  1717. local-changes-chan ([v] {:local-change v}))]
  1718. (cond
  1719. rename-event
  1720. (let [repo-dir (config/get-repo-dir (:repo rename-event))
  1721. remove-dir-prefix-fn #(remove-dir-prefix repo-dir %)
  1722. rename-event* (-> rename-event
  1723. (update :old-path remove-dir-prefix-fn)
  1724. (update :new-path remove-dir-prefix-fn))
  1725. k1 [:old-path (:old-path rename-event*) repo-dir]
  1726. k2 [:new-path (:new-path rename-event*) repo-dir]]
  1727. (swap! *rename-events conj k1 k2)
  1728. ;; remove rename-events after 2s
  1729. (go (<! (timeout 3000))
  1730. (swap! *rename-events disj k1 k2))
  1731. ;; add 2 simulated file-watcher events
  1732. (>! ch (->FileChangeEvent "unlink" repo-dir (:old-path rename-event*) nil nil))
  1733. (>! ch (->FileChangeEvent "add" repo-dir (:new-path rename-event*)
  1734. {:mtime (tc/to-long (t/now))
  1735. :size 1 ; add a fake size
  1736. } "fake-checksum"))
  1737. (recur))
  1738. local-change
  1739. (cond
  1740. (and (= "change" (.-type local-change))
  1741. (or (contains? @*rename-events [:old-path (.-path local-change) (.-dir local-change)])
  1742. (contains? @*rename-events [:new-path (.-path local-change) (.-dir local-change)])))
  1743. (do (println :debug "ignore" local-change)
  1744. ;; ignore
  1745. (recur))
  1746. (and (= "add" (.-type local-change))
  1747. (contains? @*rename-events [:new-path (.-path local-change) (.-dir local-change)]))
  1748. ;; ignore
  1749. (do (println :debug "ignore" local-change)
  1750. (recur))
  1751. (and (= "unlink" (.-type local-change))
  1752. (contains? @*rename-events [:old-path (.-path local-change) (.-dir local-change)]))
  1753. (do (println :debug "ignore" local-change)
  1754. (recur))
  1755. :else
  1756. (do (>! ch local-change)
  1757. (recur))))))
  1758. ch))
  1759. (defonce local-changes-revised-chan
  1760. (local-changes-revised-chan-builder local-changes-chan (state/get-file-rename-event-chan)))
  1761. ;;; ### encryption
  1762. (def pwd-map
  1763. "graph-uuid->{:pwd xxx :public-key xxx :private-key xxx}"
  1764. (atom {}))
  1765. (defonce *pwd-map-changed-chan
  1766. (atom {}))
  1767. (defn- get-graph-pwd-changed-chan
  1768. [graph-uuid]
  1769. (if-let [result (get @*pwd-map-changed-chan graph-uuid)]
  1770. result
  1771. (let [c (chan (async/sliding-buffer 1))]
  1772. (swap! *pwd-map-changed-chan assoc graph-uuid c)
  1773. c)))
  1774. (defn- <encrypt-content
  1775. [content key*]
  1776. (p->c (encrypt/encrypt-with-passphrase key* content)))
  1777. (defn- decrypt-content
  1778. [encrypted-content key*]
  1779. (go
  1780. (let [r (<! (p->c (encrypt/decrypt-with-passphrase key* encrypted-content)))]
  1781. (when-not (instance? ExceptionInfo r) r))))
  1782. (defn- local-storage-pwd-path
  1783. [graph-uuid]
  1784. (str "encrypted-pwd/" graph-uuid))
  1785. (defn- persist-pwd!
  1786. [pwd graph-uuid]
  1787. {:pre [(string? pwd)]}
  1788. (js/localStorage.setItem (local-storage-pwd-path graph-uuid) pwd))
  1789. (defn- remove-pwd!
  1790. [graph-uuid]
  1791. (js/localStorage.removeItem (local-storage-pwd-path graph-uuid)))
  1792. (defn get-pwd
  1793. [graph-uuid]
  1794. (js/localStorage.getItem (local-storage-pwd-path graph-uuid)))
  1795. (defn remove-all-pwd!
  1796. []
  1797. (doseq [k (filter #(string/starts-with? % "encrypted-pwd/") (js->clj (js-keys js/localStorage)))]
  1798. (js/localStorage.removeItem k))
  1799. (reset! pwd-map {}))
  1800. (defn encrypt+persist-pwd!
  1801. "- persist encrypted pwd at local-storage"
  1802. [pwd graph-uuid]
  1803. (go
  1804. (let [[value expired-at gone?]
  1805. ((juxt :value :expired-at #(-> % ex-data :err :status (= 410)))
  1806. (<! (<get-graph-salt-memoize remoteapi graph-uuid)))
  1807. [salt-value _expired-at]
  1808. (if gone?
  1809. (let [r (<! (<create-graph-salt remoteapi graph-uuid))]
  1810. (update-graph-salt-cache graph-uuid r)
  1811. ((juxt :value :expired-at) r))
  1812. [value expired-at])
  1813. encrypted-pwd (<! (<encrypt-content pwd salt-value))]
  1814. (persist-pwd! encrypted-pwd graph-uuid))))
  1815. (defn restore-pwd!
  1816. "restore pwd from persisted encrypted-pwd, update `pwd-map`"
  1817. [graph-uuid]
  1818. (go
  1819. (let [encrypted-pwd (get-pwd graph-uuid)]
  1820. (if (nil? encrypted-pwd)
  1821. {:restore-pwd-failed true}
  1822. (let [[salt-value _expired-at gone?]
  1823. ((juxt :value :expired-at #(-> % ex-data :err :status (= 410)))
  1824. (<! (<get-graph-salt-memoize remoteapi graph-uuid)))]
  1825. (if (or gone? (empty? salt-value))
  1826. {:restore-pwd-failed "expired salt"}
  1827. (let [pwd (<! (decrypt-content encrypted-pwd salt-value))]
  1828. (if (nil? pwd)
  1829. {:restore-pwd-failed (str "decrypt-pwd failed, salt: " salt-value)}
  1830. (swap! pwd-map assoc-in [graph-uuid :pwd] pwd)))))))))
  1831. (defn- set-keys&notify
  1832. [graph-uuid public-key private-key]
  1833. (swap! pwd-map assoc-in [graph-uuid :public-key] public-key)
  1834. (swap! pwd-map assoc-in [graph-uuid :private-key] private-key)
  1835. (offer! (get-graph-pwd-changed-chan graph-uuid) true))
  1836. (defn- <set-graph-encryption-keys!
  1837. [graph-uuid pwd public-key encrypted-private-key]
  1838. (go
  1839. (let [private-key (when (and pwd encrypted-private-key)
  1840. (<! (decrypt-content encrypted-private-key pwd)))]
  1841. (when (and private-key (string/starts-with? private-key "AGE-SECRET-KEY"))
  1842. (set-keys&notify graph-uuid public-key private-key)))))
  1843. (def <restored-pwd (chan (async/sliding-buffer 1)))
  1844. (def <restored-pwd-pub (async/pub <restored-pwd :graph-uuid))
  1845. (defn- <ensure-pwd-exists!
  1846. "return password or nil when restore pwd from localstorage failed"
  1847. [repo graph-uuid init-graph-keys]
  1848. (go
  1849. (let [{:keys [restore-pwd-failed]} (<! (restore-pwd! graph-uuid))
  1850. pwd (get-in @pwd-map [graph-uuid :pwd])]
  1851. (if restore-pwd-failed
  1852. (do (state/pub-event! [:modal/remote-encryption-input-pw-dialog repo
  1853. (state/get-remote-graph-info-by-uuid graph-uuid)
  1854. :input-pwd-remote
  1855. {:GraphUUID graph-uuid
  1856. :init-graph-keys init-graph-keys
  1857. :after-input-password (fn [pwd]
  1858. (when pwd
  1859. (swap! pwd-map assoc-in [graph-uuid :pwd] pwd)
  1860. (offer! <restored-pwd {:graph-uuid graph-uuid :value true})))}])
  1861. nil)
  1862. pwd))))
  1863. (defn clear-pwd!
  1864. "- clear pwd in `pwd-map`
  1865. - remove encrypted-pwd in local-storage"
  1866. [graph-uuid]
  1867. (swap! pwd-map dissoc graph-uuid)
  1868. (remove-pwd! graph-uuid))
  1869. (defn- <loop-ensure-pwd&keys
  1870. [graph-uuid repo *stopped?]
  1871. (let [<restored-pwd-sub-chan (chan 1)]
  1872. (async/sub <restored-pwd-pub graph-uuid <restored-pwd-sub-chan)
  1873. (go-loop []
  1874. (if @*stopped?
  1875. ::stop
  1876. (let [{:keys [public-key encrypted-private-key] :as r}
  1877. (<! (<get-graph-encrypt-keys-memoize remoteapi graph-uuid))
  1878. init-graph-keys (some-> (ex-data r) :err :status (= 404))
  1879. pwd (<! (<ensure-pwd-exists! repo graph-uuid init-graph-keys))]
  1880. (cond
  1881. (not pwd)
  1882. (do (println :debug "waiting password...")
  1883. (<! <restored-pwd-sub-chan) ;loop to wait password
  1884. (println :debug "waiting password...DONE" graph-uuid)
  1885. (recur))
  1886. init-graph-keys
  1887. ;; when public+private keys not stored at server
  1888. ;; generate a new key pair and upload them
  1889. (let [next-state
  1890. (let [{public-key :publicKey private-key :secretKey}
  1891. (<! (<key-gen rsapi))
  1892. _ (assert (and public-key private-key) (str :public-key public-key :private-key private-key))
  1893. encrypted-private-key (<! (<encrypt-content private-key pwd))
  1894. _ (assert (string? encrypted-private-key)
  1895. {:encrypted-private-key encrypted-private-key
  1896. :private-key private-key
  1897. :pwd pwd})
  1898. upload-r (<! (<upload-graph-encrypt-keys remoteapi graph-uuid public-key encrypted-private-key))]
  1899. (if (instance? ExceptionInfo upload-r)
  1900. (do (js/console.log "upload-graph-encrypt-keys err" upload-r)
  1901. ::stop)
  1902. (do (update-graph-encrypt-keys-cache graph-uuid {:public-key public-key
  1903. :encrypted-private-key encrypted-private-key})
  1904. :recur)))]
  1905. (if (= :recur next-state)
  1906. (recur)
  1907. next-state))
  1908. :else
  1909. ;; pwd, public-key, encrypted-private-key all exist
  1910. (do (assert (and pwd public-key encrypted-private-key) {:encrypted-private-key encrypted-private-key
  1911. :public-key public-key
  1912. :pwd pwd})
  1913. (<! (<set-graph-encryption-keys! graph-uuid pwd public-key encrypted-private-key))
  1914. (if (get-in @pwd-map [graph-uuid :private-key])
  1915. (do (when (state/modal-opened?)
  1916. (state/set-state! [:ui/loading? :set-graph-password] false)
  1917. (state/close-modal!))
  1918. ::idle)
  1919. ;; bad pwd
  1920. (do (when (state/modal-opened?)
  1921. (when (state/sub [:ui/loading? :set-graph-password])
  1922. (state/set-state! [:file-sync/set-remote-graph-password-result]
  1923. {:fail "Incorrect password. Please try again"}))
  1924. (state/set-state! [:ui/loading? :set-graph-password] false))
  1925. (clear-pwd! graph-uuid)
  1926. (recur))))))))))
  1927. (defn- <set-env&keys
  1928. [prod? graph-uuid]
  1929. (let [{:keys [private-key public-key]} (get @pwd-map graph-uuid)]
  1930. (assert (and private-key public-key) (pr-str :private-key private-key :public-key public-key
  1931. :pwd-map @pwd-map))
  1932. (<set-env rsapi graph-uuid prod? private-key public-key)))
  1933. (defn- <ensure-set-env&keys
  1934. [graph-uuid *stopped?]
  1935. (go-loop []
  1936. (let [{:keys [change timeout]}
  1937. (async/alt! (get-graph-pwd-changed-chan graph-uuid) {:change true}
  1938. (timeout 10000) {:timeout true})]
  1939. (cond
  1940. @*stopped? nil
  1941. change (<! (<set-env&keys config/FILE-SYNC-PROD? graph-uuid))
  1942. timeout (recur)))))
  1943. ;;; ### chans to control sync process
  1944. (def full-sync-chan
  1945. "offer `true` to this chan will trigger a local->remote full sync"
  1946. (chan 1))
  1947. (def full-sync-mult (async/mult full-sync-chan))
  1948. (def remote->local-sync-chan
  1949. "offer `true` to this chan will trigger a remote->local sync"
  1950. (chan 1))
  1951. (def remote->local-sync-mult (async/mult remote->local-sync-chan))
  1952. (def remote->local-full-sync-chan
  1953. "offer `true` to this chan will trigger a remote->local full sync"
  1954. (chan 1))
  1955. (def remote->local-full-sync-mult (async/mult remote->local-full-sync-chan))
  1956. (def immediately-local->remote-chan
  1957. "Immediately trigger upload of files in waiting queue"
  1958. (chan))
  1959. (def immediately-local->remote-mult (async/mult immediately-local->remote-chan))
  1960. (def pause-resume-chan
  1961. "false -> pause, true -> resume.
  1962. see also `*resume-state`"
  1963. (chan 1))
  1964. (def pause-resume-mult (async/mult pause-resume-chan))
  1965. (def recent-edited-chan
  1966. "Triggered when there is content editing"
  1967. (chan 1))
  1968. (def recent-edited-mult (async/mult recent-edited-chan))
  1969. (def last-input-time-cursor (rum/cursor state/state :editor/last-input-time))
  1970. (add-watch last-input-time-cursor "sync"
  1971. (fn [_ _ _ _]
  1972. (offer! recent-edited-chan true)))
  1973. ;;; ### sync state
  1974. (def *resume-state
  1975. "key: graph-uuid"
  1976. (atom {}))
  1977. (defn resume-state--add-remote->local-state
  1978. [graph-uuid]
  1979. (swap! *resume-state assoc graph-uuid {:remote->local true}))
  1980. (defn resume-state--add-remote->local-full-sync-state
  1981. [graph-uuid]
  1982. (swap! *resume-state assoc graph-uuid {:remote->local-full-sync true}))
  1983. (defn resume-state--add-local->remote-state
  1984. [graph-uuid local-changes]
  1985. (swap! *resume-state assoc graph-uuid {:local->remote local-changes}))
  1986. ;; (defn resume-state--add-local->remote-full-sync-state
  1987. ;; [graph-uuid]
  1988. ;; (swap! *resume-state assoc graph-uuid {:local->remote-full-sync true}))
  1989. (defn resume-state--reset
  1990. [graph-uuid]
  1991. (swap! *resume-state dissoc graph-uuid))
  1992. (defn sync-state
  1993. "create a new sync-state"
  1994. []
  1995. {:post [(s/valid? ::sync-state %)]}
  1996. {:current-syncing-graph-uuid nil
  1997. :state ::starting
  1998. :full-local->remote-files #{}
  1999. :current-local->remote-files #{}
  2000. :full-remote->local-files #{}
  2001. :current-remote->local-files #{}
  2002. :queued-local->remote-files #{}
  2003. :recent-remote->local-files #{}
  2004. :history '()})
  2005. (defn- sync-state--update-current-syncing-graph-uuid
  2006. [sync-state graph-uuid]
  2007. {:pre [(s/valid? ::sync-state sync-state)]
  2008. :post [(s/valid? ::sync-state %)]}
  2009. (assoc sync-state :current-syncing-graph-uuid graph-uuid))
  2010. (defn- sync-state--update-state
  2011. [sync-state next-state]
  2012. {:pre [(s/valid? ::state next-state)]
  2013. :post [(s/valid? ::sync-state %)]}
  2014. (assoc sync-state :state next-state))
  2015. (defn sync-state--add-current-remote->local-files
  2016. [sync-state paths]
  2017. {:post [(s/valid? ::sync-state %)]}
  2018. (update sync-state :current-remote->local-files into paths))
  2019. (defn sync-state--add-current-local->remote-files
  2020. [sync-state paths]
  2021. {:post [(s/valid? ::sync-state %)]}
  2022. (update sync-state :current-local->remote-files into paths))
  2023. (defn sync-state--add-queued-local->remote-files
  2024. [sync-state event]
  2025. {:post [(s/valid? ::sync-state %)]}
  2026. (update sync-state :queued-local->remote-files
  2027. (fn [o event]
  2028. (->> (concat o [event])
  2029. (util/distinct-by-last-wins (fn [e] (.-path e))))) event))
  2030. (defn sync-state--remove-queued-local->remote-files
  2031. [sync-state event]
  2032. {:post [(s/valid? ::sync-state %)]}
  2033. (update sync-state :queued-local->remote-files
  2034. (fn [o event]
  2035. (remove #{event} o)) event))
  2036. (defn sync-state-reset-queued-local->remote-files
  2037. [sync-state]
  2038. {:post [(s/valid? ::sync-state %)]}
  2039. (assoc sync-state :queued-local->remote-files nil))
  2040. (defn sync-state--add-recent-remote->local-files
  2041. [sync-state items]
  2042. {:pre [(s/valid? (s/coll-of ::recent-remote->local-file-item) items)]
  2043. :post [(s/valid? ::sync-state %)]}
  2044. (update sync-state :recent-remote->local-files (partial apply conj) items))
  2045. (defn sync-state--remove-recent-remote->local-files
  2046. [sync-state items]
  2047. {:post [(s/valid? ::sync-state %)]}
  2048. (update sync-state :recent-remote->local-files set/difference items))
  2049. (defn sync-state-reset-full-local->remote-files
  2050. [sync-state events]
  2051. {:post [(s/valid? ::sync-state %)]}
  2052. (assoc sync-state :full-local->remote-files events))
  2053. (defn sync-state-reset-full-remote->local-files
  2054. [sync-state events]
  2055. {:post [(s/valid? ::sync-state %)]}
  2056. (assoc sync-state :full-remote->local-files events))
  2057. (defn- add-history-items
  2058. [history paths now]
  2059. (sequence
  2060. (comp
  2061. ;; only reserve the latest one of same-path-items
  2062. (dedupe-by :path)
  2063. ;; reserve the latest 20 history items
  2064. (take 20))
  2065. (into (filter (fn [o]
  2066. (not (contains? (set paths) (:path o)))) history)
  2067. (map (fn [path] {:path path :time now}) paths))))
  2068. (defn sync-state--remove-current-remote->local-files
  2069. [sync-state paths add-history?]
  2070. {:post [(s/valid? ::sync-state %)]}
  2071. (let [now (t/now)]
  2072. (cond-> sync-state
  2073. true (update :current-remote->local-files set/difference paths)
  2074. add-history? (update :history add-history-items paths now))))
  2075. (defn sync-state--remove-current-local->remote-files
  2076. [sync-state paths add-history?]
  2077. {:post [(s/valid? ::sync-state %)]}
  2078. (let [now (t/now)]
  2079. (cond-> sync-state
  2080. true (update :current-local->remote-files set/difference paths)
  2081. add-history? (update :history add-history-items paths now))))
  2082. (defn sync-state--stopped?
  2083. "Graph syncing is stopped"
  2084. [sync-state]
  2085. {:pre [(s/valid? ::sync-state sync-state)]}
  2086. (= ::stop (:state sync-state)))
  2087. (defn sync-state--valid-to-accept-filewatcher-event?
  2088. [sync-state]
  2089. {:pre [(s/valid? ::sync-state sync-state)]}
  2090. (contains? #{::idle ::local->remote ::remote->local ::local->remote-full-sync ::remote->local-full-sync}
  2091. (:state sync-state)))
  2092. ;;; ### remote->local syncer & local->remote syncer
  2093. (defprotocol IRemote->LocalSync
  2094. (stop-remote->local! [this])
  2095. (<sync-remote->local! [this] "return ExceptionInfo when error occurs")
  2096. (<sync-remote->local-all-files! [this] "sync all files, return ExceptionInfo when error occurs"))
  2097. (defprotocol ILocal->RemoteSync
  2098. (setup-local->remote! [this])
  2099. (stop-local->remote! [this])
  2100. (<ratelimit [this from-chan] "get watched local file-change events from FROM-CHAN,
  2101. return chan returning events with rate limited")
  2102. (<sync-local->remote! [this es] "es is a sequence of `FileChangeEvent`, all items have same type.")
  2103. (<sync-local->remote-all-files! [this] "compare all local files to remote ones, sync when not equal.
  2104. if local-txid != remote-txid, return {:need-sync-remote true}"))
  2105. (defrecord ^:large-vars/cleanup-todo
  2106. Remote->LocalSyncer [user-uuid graph-uuid base-path repo *txid *txid-for-get-deletion-log *sync-state remoteapi
  2107. ^:mutable local->remote-syncer *stopped *paused]
  2108. Object
  2109. (set-local->remote-syncer! [_ s] (set! local->remote-syncer s))
  2110. (sync-files-remote->local!
  2111. [_ relative-filepath+checksum-coll latest-txid]
  2112. (go
  2113. (let [partitioned-filetxns
  2114. (sequence (filepath+checksum-coll->partitioned-filetxns
  2115. download-batch-size graph-uuid user-uuid)
  2116. relative-filepath+checksum-coll)
  2117. r
  2118. (if (empty? (flatten partitioned-filetxns))
  2119. {:succ true}
  2120. (do
  2121. (put-sync-event! {:event :start
  2122. :data {:type :full-remote->local
  2123. :graph-uuid graph-uuid
  2124. :full-sync? true
  2125. :epoch (tc/to-epoch (t/now))}})
  2126. (<! (apply-filetxns-partitions
  2127. *sync-state user-uuid graph-uuid base-path partitioned-filetxns repo
  2128. nil *stopped *paused true))))]
  2129. (cond
  2130. (instance? ExceptionInfo r) {:unknown r}
  2131. @*stopped {:stop true}
  2132. @*paused {:pause true}
  2133. :else
  2134. (do
  2135. (swap! *sync-state #(sync-state-reset-full-remote->local-files % []))
  2136. (<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo))
  2137. (reset! *txid latest-txid)
  2138. {:succ true})))))
  2139. IRemote->LocalSync
  2140. (stop-remote->local! [_] (vreset! *stopped true))
  2141. (<sync-remote->local! [_]
  2142. (go
  2143. (let [r
  2144. (let [diff-r (<! (<get-diff remoteapi graph-uuid @*txid))]
  2145. (if (instance? ExceptionInfo diff-r)
  2146. diff-r
  2147. (let [[diff-txns latest-txid min-txid] diff-r]
  2148. (if (> (dec min-txid) @*txid) ;; min-txid-1 > @*txid, need to remote->local-full-sync
  2149. (do (println "min-txid" min-txid "request-txid" @*txid)
  2150. {:need-remote->local-full-sync true})
  2151. (when (pos-int? latest-txid)
  2152. (let [filtered-diff-txns (-> (transduce (diffs->filetxns) conj '() (reverse diff-txns))
  2153. filter-download-files-with-reserved-chars)
  2154. partitioned-filetxns (transduce (partition-filetxns download-batch-size)
  2155. (completing (fn [r i] (conj r (reverse i)))) ;reverse
  2156. '()
  2157. filtered-diff-txns)]
  2158. (put-sync-event! {:event :start
  2159. :data {:type :remote->local
  2160. :graph-uuid graph-uuid
  2161. :full-sync? false
  2162. :epoch (tc/to-epoch (t/now))}})
  2163. (if (empty? (flatten partitioned-filetxns))
  2164. (do
  2165. (swap! *sync-state #(sync-state-reset-full-remote->local-files % []))
  2166. (<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo))
  2167. (reset! *txid latest-txid)
  2168. {:succ true})
  2169. (<! (apply-filetxns-partitions
  2170. *sync-state user-uuid graph-uuid base-path
  2171. partitioned-filetxns repo *txid *stopped *paused false)))))))))]
  2172. (cond
  2173. (instance? ExceptionInfo r) {:unknown r}
  2174. @*stopped {:stop true}
  2175. @*paused {:pause true}
  2176. (:need-remote->local-full-sync r) r
  2177. :else {:succ true}))))
  2178. (<sync-remote->local-all-files! [this]
  2179. (go
  2180. (let [remote-all-files-meta-c (<get-remote-all-files-meta remoteapi graph-uuid)
  2181. local-all-files-meta-c (<get-local-all-files-meta rsapi graph-uuid base-path)
  2182. remote-all-files-meta-or-exp (<! remote-all-files-meta-c)]
  2183. (if (or (storage-exceed-limit? remote-all-files-meta-or-exp)
  2184. (sync-stop-when-api-flying? remote-all-files-meta-or-exp)
  2185. (decrypt-exp? remote-all-files-meta-or-exp))
  2186. (do (put-sync-event! {:event :exception-decrypt-failed
  2187. :data {:graph-uuid graph-uuid
  2188. :exp remote-all-files-meta-or-exp
  2189. :epoch (tc/to-epoch (t/now))}})
  2190. {:stop true})
  2191. (let [remote-all-files-meta remote-all-files-meta-or-exp
  2192. local-all-files-meta (<! local-all-files-meta-c)
  2193. {diff-remote-files :result elapsed-time :time}
  2194. (util/with-time (diff-file-metadata-sets remote-all-files-meta local-all-files-meta))
  2195. _ (println ::diff-file-metadata-sets-elapsed-time elapsed-time "ms")
  2196. recent-10-days-range ((juxt #(tc/to-long (t/minus % (t/days 10))) #(tc/to-long %)) (t/today))
  2197. sorted-diff-remote-files
  2198. (sort-by
  2199. (sort-file-metadata-fn :recent-days-range recent-10-days-range) > diff-remote-files)
  2200. remote-txid-or-ex (<! (<get-remote-txid remoteapi graph-uuid))
  2201. latest-txid (:TXId remote-txid-or-ex)]
  2202. (if (or (instance? ExceptionInfo remote-txid-or-ex) (nil? latest-txid))
  2203. (do (put-sync-event! {:event :get-remote-graph-failed
  2204. :data {:graph-uuid graph-uuid
  2205. :exp remote-txid-or-ex
  2206. :epoch (tc/to-epoch (t/now))}})
  2207. {:stop true})
  2208. (do (println "[full-sync(remote->local)]" (count sorted-diff-remote-files) "files need to sync")
  2209. (let [filtered-files (filter-download-files-with-reserved-chars sorted-diff-remote-files)]
  2210. (swap! *sync-state #(sync-state-reset-full-remote->local-files % sorted-diff-remote-files))
  2211. (<! (.sync-files-remote->local!
  2212. this (map (juxt relative-path -checksum)
  2213. filtered-files)
  2214. latest-txid)))))))))))
  2215. (defn- <file-changed?
  2216. "return true when file changed compared with remote"
  2217. [graph-uuid file-path-without-base-path base-path]
  2218. {:pre [(string? file-path-without-base-path)]}
  2219. (go
  2220. (let [remote-meta-or-exp (<! (<get-remote-files-meta remoteapi graph-uuid [file-path-without-base-path]))
  2221. local-meta (first (<! (<get-local-files-meta rsapi graph-uuid base-path [file-path-without-base-path])))]
  2222. (if (instance? ExceptionInfo remote-meta-or-exp)
  2223. false
  2224. (not= (first remote-meta-or-exp) local-meta)))))
  2225. (defn- <filter-local-changes-pred
  2226. "filter local-change events:
  2227. - for 'unlink' event
  2228. - when related file exists on local dir, ignore this event
  2229. - for 'add' | 'change' event
  2230. - when related file's content is same as remote file, ignore it"
  2231. [^FileChangeEvent e basepath graph-uuid]
  2232. (go
  2233. (let [r-path (relative-path e)]
  2234. (case (.-type e)
  2235. "unlink"
  2236. ;; keep this e when it's not found
  2237. (<! (<local-file-not-exist? graph-uuid rsapi basepath r-path))
  2238. ("add" "change")
  2239. ;; 1. local file exists
  2240. ;; 2. compare with remote file, and changed
  2241. (and (not (<! (<local-file-not-exist? graph-uuid rsapi basepath r-path)))
  2242. (<! (<file-changed? graph-uuid r-path basepath)))))))
  2243. (defn- <filter-checksum-not-consistent
  2244. "filter out FileChangeEvents checksum changed,
  2245. compare checksum in FileChangeEvent and checksum calculated now"
  2246. [graph-uuid es]
  2247. {:pre [(or (nil? es) (coll? es))
  2248. (every? #(instance? FileChangeEvent %) es)]}
  2249. (go
  2250. (when (seq es)
  2251. (if (= "unlink" (.-type ^FileChangeEvent (first es)))
  2252. es
  2253. (let [base-path (.-dir (first es))
  2254. files-meta (<! (<get-local-files-meta
  2255. rsapi graph-uuid base-path (mapv relative-path es)))
  2256. current-checksum-map (when (coll? files-meta) (into {} (mapv (juxt :path :etag) files-meta)))
  2257. origin-checksum-map (into {} (mapv (juxt relative-path #(.-checksum ^FileChangeEvent %)) es))
  2258. origin-map (into {} (mapv (juxt relative-path identity) es))]
  2259. (->>
  2260. (merge-with
  2261. #(boolean (or (nil? %1) (= "fake-checksum" %1) (= %1 %2)))
  2262. origin-checksum-map current-checksum-map)
  2263. (filterv (comp true? second))
  2264. (mapv first)
  2265. (select-keys origin-map)
  2266. vals))))))
  2267. (def ^:private file-size-limit (* 100 1000 1024)) ;100MB
  2268. (defn- filter-too-huge-files-aux
  2269. [e]
  2270. {:post [(boolean? %)]}
  2271. (if (= "unlink" (.-type ^FileChangeEvent e))
  2272. true
  2273. (boolean
  2274. (when-some [size (:size (.-stat e))]
  2275. (< size file-size-limit)))))
  2276. (defn- filter-too-huge-files
  2277. "filter out files > `file-size-limit`"
  2278. [es]
  2279. {:pre [(or (nil? es) (coll? es))
  2280. (every? #(instance? FileChangeEvent %) es)]}
  2281. (filterv filter-too-huge-files-aux es))
  2282. (defn- filter-local-files-in-deletion-logs
  2283. [local-all-files-meta deletion-logs remote-all-files-meta]
  2284. (let [deletion-logs-map (into {}
  2285. (mapcat
  2286. (fn [log]
  2287. (mapv
  2288. (fn [path] [path (select-keys log [:epoch :TXId])])
  2289. (:paths log))))
  2290. deletion-logs)
  2291. remote-all-files-meta-map (into {} (map (juxt :path identity)) remote-all-files-meta)
  2292. *keep (transient #{})
  2293. *delete (transient #{})
  2294. filtered-deletion-logs-map
  2295. (loop [[deletion-log & others] deletion-logs-map
  2296. result {}]
  2297. (if-not deletion-log
  2298. result
  2299. (let [[deletion-log-path deletion-log-meta] deletion-log
  2300. meta (get remote-all-files-meta-map deletion-log-path)
  2301. meta-txid (:txid meta)
  2302. deletion-txid (:TXId deletion-log-meta)]
  2303. (if (and meta-txid deletion-txid
  2304. (> meta-txid deletion-txid))
  2305. (recur others result)
  2306. (recur others (into result [[deletion-log-path deletion-log-meta]]))))))]
  2307. (doseq [f local-all-files-meta]
  2308. (let [epoch-long (some-> (get filtered-deletion-logs-map (:path f))
  2309. :epoch
  2310. (* 1000))]
  2311. (if (and epoch-long (> epoch-long (:last-modified f)))
  2312. (conj! *delete f)
  2313. (conj! *keep f))))
  2314. {:keep (persistent! *keep)
  2315. :delete (persistent! *delete)}))
  2316. (defn- <filter-too-long-filename
  2317. [graph-uuid local-files-meta]
  2318. (go (let [origin-fnames (mapv :path local-files-meta)
  2319. encrypted-fnames (<! (<encrypt-fnames rsapi graph-uuid origin-fnames))
  2320. fnames-map (zipmap origin-fnames encrypted-fnames)
  2321. local-files-meta-map (into {} (map (fn [meta] [(:path meta) meta])) local-files-meta)]
  2322. (sequence
  2323. (comp
  2324. (filter
  2325. (fn [[path _]]
  2326. ; 950 = (- 1024 36 36 2)
  2327. ; 1024 - length of 'user-uuid/graph-uuid/'
  2328. (<= (count (get fnames-map path)) 950)))
  2329. (map second))
  2330. local-files-meta-map))))
  2331. (defrecord ^:large-vars/cleanup-todo
  2332. Local->RemoteSyncer [user-uuid graph-uuid base-path repo *sync-state remoteapi
  2333. ^:mutable rate *txid *txid-for-get-deletion-log
  2334. ^:mutable remote->local-syncer stop-chan *stopped *paused
  2335. ;; control chans
  2336. private-immediately-local->remote-chan private-recent-edited-chan]
  2337. Object
  2338. (filter-file-change-events-fn [_]
  2339. (fn [^FileChangeEvent e]
  2340. (go (and (instance? FileChangeEvent e)
  2341. (if-let [mtime (:mtime (.-stat e))]
  2342. ;; if mtime is not nil, it should be after (- now 1min)
  2343. ;; ignore events too early
  2344. (> (* 1000 mtime) (tc/to-long (t/minus (t/now) (t/minutes 1))))
  2345. true)
  2346. (or (string/starts-with? (.-dir e) base-path)
  2347. (string/starts-with? (str "file://" (.-dir e)) base-path)) ; valid path prefix
  2348. (not (ignored? e)) ;not ignored
  2349. ;; download files will also trigger file-change-events, ignore them
  2350. (if (= "unlink" (:type e))
  2351. true
  2352. (when-some [recent-remote->local-file-item
  2353. (<! (<file-change-event=>recent-remote->local-file-item
  2354. graph-uuid e))]
  2355. (not (contains? (:recent-remote->local-files @*sync-state) recent-remote->local-file-item))))))))
  2356. (set-remote->local-syncer! [_ s] (set! remote->local-syncer s))
  2357. ILocal->RemoteSync
  2358. (setup-local->remote! [_]
  2359. (async/tap immediately-local->remote-mult private-immediately-local->remote-chan)
  2360. (async/tap recent-edited-mult private-recent-edited-chan))
  2361. (stop-local->remote! [_]
  2362. (async/untap immediately-local->remote-mult private-immediately-local->remote-chan)
  2363. (async/untap recent-edited-mult private-recent-edited-chan)
  2364. (async/close! stop-chan)
  2365. (vreset! *stopped true))
  2366. (<ratelimit [this from-chan]
  2367. (let [<fast-filter-e-fn (.filter-file-change-events-fn this)]
  2368. (util/<ratelimit
  2369. from-chan rate
  2370. :filter-fn
  2371. (fn [e]
  2372. (go
  2373. (and (rsapi-ready? rsapi graph-uuid)
  2374. (<! (<fast-filter-e-fn e))
  2375. (do
  2376. (swap! *sync-state sync-state--add-queued-local->remote-files e)
  2377. (let [v (<! (<filter-local-changes-pred e base-path graph-uuid))]
  2378. (when-not v
  2379. (swap! *sync-state sync-state--remove-queued-local->remote-files e))
  2380. v)))))
  2381. :flush-fn #(swap! *sync-state sync-state-reset-queued-local->remote-files)
  2382. :stop-ch stop-chan
  2383. :distinct-coll? true
  2384. :flush-now-ch private-immediately-local->remote-chan
  2385. :refresh-timeout-ch private-recent-edited-chan)))
  2386. (<sync-local->remote! [_ es]
  2387. (if (empty? es)
  2388. (go {:succ true})
  2389. (let [type (.-type ^FileChangeEvent (first es))
  2390. es->paths-xf (comp
  2391. (map #(relative-path %))
  2392. (remove ignored?))]
  2393. (go
  2394. (let [es* (<! (<filter-checksum-not-consistent graph-uuid es))
  2395. _ (when (not= (count es*) (count es))
  2396. (println :debug :filter-checksum-changed
  2397. (mapv relative-path (set/difference (set es) (set es*)))))
  2398. es** (filter-too-huge-files es*)
  2399. _ (when (not= (count es**) (count es*))
  2400. (println :debug :filter-too-huge-files
  2401. (mapv relative-path (set/difference (set es*) (set es**)))))
  2402. paths (cond-> (sequence es->paths-xf es**)
  2403. (not= type "unlink")
  2404. filter-upload-files-with-reserved-chars)
  2405. _ (println :sync-local->remote type paths)
  2406. r (if (empty? paths)
  2407. (go @*txid)
  2408. (case type
  2409. ("add" "change")
  2410. (<with-pause (<update-remote-files rsapi graph-uuid base-path paths @*txid) *paused)
  2411. "unlink"
  2412. (<with-pause (<delete-remote-files rsapi graph-uuid base-path paths @*txid) *paused)))
  2413. _ (swap! *sync-state sync-state--add-current-local->remote-files paths)
  2414. r* (<! r)
  2415. [succ? paused?] ((juxt number? :pause) r*)
  2416. _ (swap! *sync-state sync-state--remove-current-local->remote-files paths succ?)]
  2417. (cond
  2418. (need-sync-remote? r*)
  2419. (do (println :need-sync-remote r*)
  2420. {:need-sync-remote true})
  2421. (need-reset-local-txid? r*) ;; TODO: this cond shouldn't be true,
  2422. ;; but some potential bugs cause local-txid > remote-txid
  2423. (let [remote-txid-or-ex (<! (<get-remote-txid remoteapi graph-uuid))
  2424. remote-txid (:TXId remote-txid-or-ex)]
  2425. (if (or (instance? ExceptionInfo remote-txid-or-ex) (nil? remote-txid))
  2426. (do (put-sync-event! {:event :get-remote-graph-failed
  2427. :data {:graph-uuid graph-uuid
  2428. :exp remote-txid-or-ex
  2429. :epoch (tc/to-epoch (t/now))}})
  2430. {:stop true})
  2431. (do (<! (<update-graphs-txid! remote-txid graph-uuid user-uuid repo))
  2432. (reset! *txid remote-txid)
  2433. {:succ true})))
  2434. (graph-has-been-deleted? r*)
  2435. (do (println :graph-has-been-deleted r*)
  2436. {:graph-has-been-deleted true})
  2437. (stop-sync-by-rsapi-response? r*)
  2438. (do (println :stop-sync-caused-by-rsapi-err-response r*)
  2439. (notification/show! (t :file-sync/rsapi-cannot-upload-err) :warning false)
  2440. {:stop true})
  2441. paused?
  2442. {:pause true}
  2443. succ? ; succ
  2444. (do
  2445. (println "sync-local->remote! update txid" r*)
  2446. ;; persist txid
  2447. (<! (<update-graphs-txid! r* graph-uuid user-uuid repo))
  2448. (reset! *txid r*)
  2449. {:succ true})
  2450. :else
  2451. (do
  2452. (println "sync-local->remote unknown:" r*)
  2453. {:unknown r*})))))))
  2454. (<sync-local->remote-all-files! [this]
  2455. (go
  2456. (let [remote-all-files-meta-c (<get-remote-all-files-meta remoteapi graph-uuid)
  2457. local-all-files-meta-c (<get-local-all-files-meta rsapi graph-uuid base-path)
  2458. deletion-logs-c (<get-deletion-logs remoteapi graph-uuid @*txid-for-get-deletion-log)
  2459. remote-all-files-meta-or-exp (<! remote-all-files-meta-c)
  2460. deletion-logs-or-exp (<! deletion-logs-c)]
  2461. (cond
  2462. (or (storage-exceed-limit? remote-all-files-meta-or-exp)
  2463. (sync-stop-when-api-flying? remote-all-files-meta-or-exp)
  2464. (decrypt-exp? remote-all-files-meta-or-exp))
  2465. (do (put-sync-event! {:event :get-remote-all-files-failed
  2466. :data {:graph-uuid graph-uuid
  2467. :exp remote-all-files-meta-or-exp
  2468. :epoch (tc/to-epoch (t/now))}})
  2469. {:stop true})
  2470. (instance? ExceptionInfo deletion-logs-or-exp)
  2471. (do (put-sync-event! {:event :get-deletion-logs-failed
  2472. :data {:graph-uuid graph-uuid
  2473. :exp deletion-logs-or-exp
  2474. :epoch (tc/to-epoch (t/now))}})
  2475. {:stop true})
  2476. :else
  2477. (let [remote-all-files-meta remote-all-files-meta-or-exp
  2478. local-all-files-meta (<! local-all-files-meta-c)
  2479. {local-all-files-meta :keep delete-local-files :delete}
  2480. (filter-local-files-in-deletion-logs local-all-files-meta deletion-logs-or-exp remote-all-files-meta)
  2481. recent-10-days-range ((juxt #(tc/to-long (t/minus % (t/days 10))) #(tc/to-long %)) (t/today))
  2482. diff-local-files (->> (diff-file-metadata-sets local-all-files-meta remote-all-files-meta)
  2483. (<filter-too-long-filename graph-uuid)
  2484. <!
  2485. (sort-by (sort-file-metadata-fn :recent-days-range recent-10-days-range) >))
  2486. change-events
  2487. (sequence
  2488. (comp
  2489. ;; convert to FileChangeEvent
  2490. (map #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %)
  2491. {:size (:size %)} (:etag %)))
  2492. (remove ignored?))
  2493. diff-local-files)
  2494. distinct-change-events (-> (distinct-file-change-events change-events)
  2495. filter-upload-files-with-reserved-chars)
  2496. _ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distinct-change-events))
  2497. change-events-partitions
  2498. (sequence
  2499. ;; partition FileChangeEvents
  2500. (partition-file-change-events upload-batch-size)
  2501. distinct-change-events)]
  2502. (println "[full-sync(local->remote)]"
  2503. (count (flatten change-events-partitions)) "files need to sync and"
  2504. (count delete-local-files) "local files need to delete")
  2505. (put-sync-event! {:event :start
  2506. :data {:type :full-local->remote
  2507. :graph-uuid graph-uuid
  2508. :full-sync? true
  2509. :epoch (tc/to-epoch (t/now))}})
  2510. ;; 1. delete local files
  2511. (loop [[f & fs] delete-local-files]
  2512. (when f
  2513. (let [relative-p (relative-path f)]
  2514. (when-not (<! (<local-file-not-exist? graph-uuid rsapi base-path relative-p))
  2515. (let [fake-recent-remote->local-file-item {:remote->local-type :delete
  2516. :checksum nil
  2517. :path relative-p}]
  2518. (swap! *sync-state sync-state--add-recent-remote->local-files
  2519. [fake-recent-remote->local-file-item])
  2520. (<! (<delete-local-files rsapi graph-uuid base-path [(relative-path f)]))
  2521. (go (<! (timeout 5000))
  2522. (swap! *sync-state sync-state--remove-recent-remote->local-files
  2523. [fake-recent-remote->local-file-item])))))
  2524. (recur fs)))
  2525. ;; 2. upload local files
  2526. (let [r (loop [es-partitions change-events-partitions]
  2527. (if @*stopped
  2528. {:stop true}
  2529. (if (empty? es-partitions)
  2530. {:succ true}
  2531. (let [{:keys [succ need-sync-remote graph-has-been-deleted unknown stop] :as r}
  2532. (<! (<sync-local->remote! this (first es-partitions)))]
  2533. (s/assert ::sync-local->remote!-result r)
  2534. (cond
  2535. succ
  2536. (recur (next es-partitions))
  2537. (or need-sync-remote graph-has-been-deleted unknown stop) r)))))]
  2538. ;; update *txid-for-get-deletion-log
  2539. (reset! *txid-for-get-deletion-log @*txid)
  2540. r
  2541. )))))))
  2542. ;;; ### put all stuff together
  2543. (defrecord ^:large-vars/cleanup-todo
  2544. SyncManager [user-uuid graph-uuid base-path *sync-state
  2545. ^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer remoteapi
  2546. ^:mutable ratelimit-local-changes-chan
  2547. *txid *txid-for-get-deletion-log
  2548. ^:mutable state ^:mutable remote-change-chan ^:mutable *ws *stopped? *paused?
  2549. ^:mutable ops-chan ^:mutable app-awake-from-sleep-chan
  2550. ;; control chans
  2551. private-full-sync-chan private-remote->local-sync-chan
  2552. private-remote->local-full-sync-chan private-pause-resume-chan]
  2553. Object
  2554. (schedule [this next-state args reason]
  2555. {:pre [(s/valid? ::state next-state)]}
  2556. (println (str "[SyncManager " graph-uuid "]")
  2557. (and state (name state)) "->" (and next-state (name next-state)) :reason reason :local-txid @*txid :now (tc/to-string (t/now)))
  2558. (set! state next-state)
  2559. (swap! *sync-state sync-state--update-state next-state)
  2560. (go
  2561. (case state
  2562. ::need-password
  2563. (<! (.need-password this))
  2564. ::idle
  2565. (<! (.idle this))
  2566. ::local->remote
  2567. (<! (.local->remote this args))
  2568. ::remote->local
  2569. (<! (.remote->local this nil args))
  2570. ::local->remote-full-sync
  2571. (<! (.full-sync this))
  2572. ::remote->local-full-sync
  2573. (<! (.remote->local-full-sync this args))
  2574. ::pause
  2575. (<! (.pause this))
  2576. ::stop
  2577. (-stop! this))))
  2578. (start [this]
  2579. (set! ops-chan (chan (async/dropping-buffer 10)))
  2580. (set! app-awake-from-sleep-chan (chan (async/sliding-buffer 1)))
  2581. (set! *ws (atom nil))
  2582. (set! remote-change-chan (ws-listen! graph-uuid *ws))
  2583. (set! ratelimit-local-changes-chan (<ratelimit local->remote-syncer local-changes-revised-chan))
  2584. (setup-local->remote! local->remote-syncer)
  2585. (async/tap full-sync-mult private-full-sync-chan)
  2586. (async/tap remote->local-sync-mult private-remote->local-sync-chan)
  2587. (async/tap remote->local-full-sync-mult private-remote->local-full-sync-chan)
  2588. (async/tap pause-resume-mult private-pause-resume-chan)
  2589. (async/tap pubsub/app-wake-up-from-sleep-mult app-awake-from-sleep-chan)
  2590. (go-loop []
  2591. (let [{:keys [remote->local remote->local-full-sync local->remote-full-sync local->remote resume pause stop]}
  2592. (async/alt!
  2593. private-remote->local-full-sync-chan {:remote->local-full-sync true}
  2594. private-remote->local-sync-chan {:remote->local true}
  2595. private-full-sync-chan {:local->remote-full-sync true}
  2596. private-pause-resume-chan ([v] (if v {:resume true} {:pause true}))
  2597. remote-change-chan ([v] (println "remote change:" v) {:remote->local v})
  2598. ratelimit-local-changes-chan ([v]
  2599. (if (nil? v)
  2600. {:stop true}
  2601. (let [rest-v (util/drain-chan ratelimit-local-changes-chan)
  2602. vs (cons v rest-v)]
  2603. (println "local changes:" vs)
  2604. {:local->remote vs})))
  2605. app-awake-from-sleep-chan {:remote->local true}
  2606. (timeout (* 20 60 1000)) {:local->remote-full-sync true}
  2607. (timeout (* 10 60 1000)) {:remote->local true}
  2608. :priority true)]
  2609. (cond
  2610. stop
  2611. nil
  2612. remote->local-full-sync
  2613. (do (util/drain-chan ops-chan)
  2614. (>! ops-chan {:remote->local-full-sync true})
  2615. (recur))
  2616. remote->local
  2617. (let [txid
  2618. (if (true? remote->local)
  2619. {:txid (:TXId (<! (<get-remote-txid remoteapi graph-uuid)))}
  2620. remote->local)]
  2621. (when (some? txid)
  2622. (>! ops-chan {:remote->local txid}))
  2623. (recur))
  2624. local->remote
  2625. (do (>! ops-chan {:local->remote local->remote})
  2626. (recur))
  2627. local->remote-full-sync
  2628. (do (util/drain-chan ops-chan)
  2629. (>! ops-chan {:local->remote-full-sync true})
  2630. (recur))
  2631. resume
  2632. (do (>! ops-chan {:resume true})
  2633. (recur))
  2634. pause
  2635. (do (vreset! *paused? true)
  2636. (>! ops-chan {:pause true})
  2637. (recur)))))
  2638. (.schedule this ::need-password nil nil))
  2639. (need-password
  2640. [this]
  2641. (go
  2642. (let [next-state (<! (<loop-ensure-pwd&keys graph-uuid (state/get-current-repo) *stopped?))]
  2643. (assert (s/valid? ::state next-state) next-state)
  2644. (when (= next-state ::idle)
  2645. (<! (<ensure-set-env&keys graph-uuid *stopped?)))
  2646. (if @*stopped?
  2647. (.schedule this ::stop nil nil)
  2648. (.schedule this next-state nil nil)))))
  2649. (pause [this]
  2650. (go (<! (<rsapi-cancel-all-requests)))
  2651. (put-sync-event! {:event :pause
  2652. :data {:graph-uuid graph-uuid
  2653. :epoch (tc/to-epoch (t/now))}})
  2654. (go-loop []
  2655. (let [{:keys [resume] :as result} (<! ops-chan)]
  2656. (cond
  2657. resume
  2658. (let [{:keys [remote->local remote->local-full-sync local->remote local->remote-full-sync] :as resume-state}
  2659. (get @*resume-state graph-uuid)]
  2660. (resume-state--reset graph-uuid)
  2661. (vreset! *paused? false)
  2662. (cond
  2663. remote->local
  2664. (offer! private-remote->local-sync-chan true)
  2665. remote->local-full-sync
  2666. (offer! private-remote->local-full-sync-chan true)
  2667. local->remote
  2668. (>! ops-chan {:local->remote local->remote})
  2669. local->remote-full-sync
  2670. (offer! private-full-sync-chan true)
  2671. :else
  2672. ;; if resume-state = nil, try a remote->local to sync recent diffs
  2673. (offer! private-remote->local-sync-chan true))
  2674. (put-sync-event! {:event :resume
  2675. :data {:graph-uuid graph-uuid
  2676. :resume-state resume-state
  2677. :epoch (tc/to-epoch (t/now))}})
  2678. (<! (.schedule this ::idle nil :resume)))
  2679. (nil? result)
  2680. (<! (.schedule this ::stop nil nil))
  2681. :else
  2682. (recur)))))
  2683. (idle [this]
  2684. (go
  2685. (let [{:keys [stop remote->local local->remote local->remote-full-sync remote->local-full-sync pause resume] :as result}
  2686. (<! ops-chan)]
  2687. (cond
  2688. (or stop (nil? result))
  2689. (<! (.schedule this ::stop nil nil))
  2690. remote->local
  2691. (<! (.schedule this ::remote->local {:remote remote->local} {:remote-changed remote->local}))
  2692. local->remote
  2693. (<! (.schedule this ::local->remote {:local local->remote} {:local-changed local->remote}))
  2694. local->remote-full-sync
  2695. (<! (.schedule this ::local->remote-full-sync nil nil))
  2696. remote->local-full-sync
  2697. (<! (.schedule this ::remote->local-full-sync nil nil))
  2698. pause
  2699. (<! (.schedule this ::pause nil nil))
  2700. resume
  2701. (<! (.schedule this ::idle nil nil))
  2702. :else
  2703. (do
  2704. (state/pub-event! [:capture-error {:error (js/Error. "sync/wrong-ops-chan-when-idle")
  2705. :payload {:type :sync/wrong-ops-chan-when-idle
  2706. :ops-chan-result result
  2707. :user-id user-uuid
  2708. :graph-id graph-uuid}}])
  2709. (<! (.schedule this ::idle nil nil)))))))
  2710. (full-sync [this]
  2711. (go
  2712. (let [{:keys [succ need-sync-remote graph-has-been-deleted unknown stop] :as r}
  2713. (<! (<sync-local->remote-all-files! local->remote-syncer))]
  2714. (s/assert ::sync-local->remote-all-files!-result r)
  2715. (cond
  2716. succ
  2717. (do
  2718. (swap! *sync-state #(sync-state-reset-full-local->remote-files % []))
  2719. (put-sync-event! {:event :finished-local->remote
  2720. :data {:graph-uuid graph-uuid
  2721. :full-sync? true
  2722. :epoch (tc/to-epoch (t/now))}})
  2723. (.schedule this ::idle nil nil))
  2724. need-sync-remote
  2725. (do (util/drain-chan ops-chan)
  2726. (>! ops-chan {:remote->local true})
  2727. (>! ops-chan {:local->remote-full-sync true})
  2728. (.schedule this ::idle nil nil))
  2729. graph-has-been-deleted
  2730. (.schedule this ::stop nil :graph-has-been-deleted)
  2731. stop
  2732. (.schedule this ::stop nil nil)
  2733. unknown
  2734. (do
  2735. (state/pub-event! [:capture-error {:error unknown
  2736. :payload {:type :sync/unknown
  2737. :event :local->remote-full-sync-failed
  2738. :user-id user-uuid
  2739. :graph-uuid graph-uuid}}])
  2740. (put-sync-event! {:event :local->remote-full-sync-failed
  2741. :data {:graph-uuid graph-uuid
  2742. :epoch (tc/to-epoch (t/now))}})
  2743. (.schedule this ::idle nil nil))))))
  2744. (remote->local-full-sync [this _]
  2745. (go
  2746. (let [{:keys [succ unknown stop pause]}
  2747. (<! (<sync-remote->local-all-files! remote->local-syncer))]
  2748. (cond
  2749. succ
  2750. (do (put-sync-event! {:event :finished-remote->local
  2751. :data {:graph-uuid graph-uuid
  2752. :full-sync? true
  2753. :epoch (tc/to-epoch (t/now))}})
  2754. (.schedule this ::idle nil nil))
  2755. stop
  2756. (.schedule this ::stop nil nil)
  2757. pause
  2758. (do (resume-state--add-remote->local-full-sync-state graph-uuid)
  2759. (.schedule this ::pause nil nil))
  2760. unknown
  2761. (do
  2762. (state/pub-event! [:capture-error {:error unknown
  2763. :payload {:event :remote->local-full-sync-failed
  2764. :type :sync/unknown
  2765. :graph-uuid graph-uuid
  2766. :user-id user-uuid}}])
  2767. (put-sync-event! {:event :remote->local-full-sync-failed
  2768. :data {:graph-uuid graph-uuid
  2769. :exp unknown
  2770. :epoch (tc/to-epoch (t/now))}})
  2771. (let [next-state (if (string/includes? (str (ex-cause unknown)) "404 Not Found")
  2772. ;; TODO: this should never happen
  2773. ::stop
  2774. ;; if any other exception occurred, re-exec remote->local-full-sync
  2775. ::remote->local-full-sync)]
  2776. (.schedule this next-state nil nil)))))))
  2777. (remote->local [this _next-state {remote-val :remote}]
  2778. (go
  2779. (if (some-> remote-val :txid (<= @*txid))
  2780. (.schedule this ::idle nil nil)
  2781. (let [origin-txid @*txid
  2782. {:keys [succ unknown stop pause need-remote->local-full-sync] :as r}
  2783. (<! (<sync-remote->local! remote->local-syncer))]
  2784. (s/assert ::sync-remote->local!-result r)
  2785. (cond
  2786. need-remote->local-full-sync
  2787. (do (util/drain-chan ops-chan)
  2788. (>! ops-chan {:remote->local-full-sync true})
  2789. (>! ops-chan {:local->remote-full-sync true})
  2790. (.schedule this ::idle nil nil))
  2791. succ
  2792. (do (put-sync-event! {:event :finished-remote->local
  2793. :data {:graph-uuid graph-uuid
  2794. :full-sync? false
  2795. :from-txid origin-txid
  2796. :to-txid @*txid
  2797. :epoch (tc/to-epoch (t/now))}})
  2798. (.schedule this ::idle nil nil))
  2799. stop
  2800. (.schedule this ::stop nil nil)
  2801. pause
  2802. (do (resume-state--add-remote->local-state graph-uuid)
  2803. (.schedule this ::pause nil nil))
  2804. unknown
  2805. (do (prn "remote->local err" unknown)
  2806. (state/pub-event! [:capture-error {:error unknown
  2807. :payload {:type :sync/unknown
  2808. :event :remote->local
  2809. :user-id user-uuid
  2810. :graph-uuid graph-uuid}}])
  2811. (.schedule this ::idle nil nil)))))))
  2812. (local->remote [this {local-changes :local}]
  2813. ;; local-changes:: list of FileChangeEvent
  2814. (assert (some? local-changes) local-changes)
  2815. (go
  2816. (let [distincted-local-changes (distinct-file-change-events local-changes)
  2817. _ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distincted-local-changes))
  2818. change-events-partitions
  2819. (sequence (partition-file-change-events upload-batch-size) distincted-local-changes)
  2820. _ (put-sync-event! {:event :start
  2821. :data {:type :local->remote
  2822. :graph-uuid graph-uuid
  2823. :full-sync? false
  2824. :epoch (tc/to-epoch (t/now))}})
  2825. {:keys [succ need-sync-remote graph-has-been-deleted unknown stop pause]}
  2826. (loop [es-partitions change-events-partitions]
  2827. (cond
  2828. @*stopped? {:stop true}
  2829. @*paused? {:pause true}
  2830. (empty? es-partitions) {:succ true}
  2831. :else
  2832. (let [{:keys [succ need-sync-remote graph-has-been-deleted pause unknown stop] :as r}
  2833. (<! (<sync-local->remote! local->remote-syncer (first es-partitions)))]
  2834. (s/assert ::sync-local->remote!-result r)
  2835. (cond
  2836. succ
  2837. (recur (next es-partitions))
  2838. (or need-sync-remote graph-has-been-deleted unknown pause stop) r))))]
  2839. (cond
  2840. succ
  2841. (do
  2842. (swap! *sync-state #(sync-state-reset-full-local->remote-files % []))
  2843. (put-sync-event! {:event :finished-local->remote
  2844. :data {:graph-uuid graph-uuid
  2845. :full-sync? false
  2846. :file-change-events distincted-local-changes
  2847. :epoch (tc/to-epoch (t/now))}})
  2848. (.schedule this ::idle nil nil))
  2849. need-sync-remote
  2850. (do (util/drain-chan ops-chan)
  2851. (>! ops-chan {:remote->local true})
  2852. (>! ops-chan {:local->remote local-changes})
  2853. (.schedule this ::idle nil nil))
  2854. graph-has-been-deleted
  2855. (.schedule this ::stop nil :graph-has-been-deleted)
  2856. stop
  2857. (.schedule this ::stop nil nil)
  2858. pause
  2859. (do (resume-state--add-local->remote-state graph-uuid local-changes)
  2860. (.schedule this ::pause nil nil))
  2861. unknown
  2862. (do
  2863. (debug/pprint "local->remote" unknown)
  2864. (state/pub-event! [:capture-error {:error unknown
  2865. :payload {:event :local->remote
  2866. :type :sync/unknown
  2867. :user-id user-uuid
  2868. :graph-uuid graph-uuid}}])
  2869. (.schedule this ::idle nil nil))))))
  2870. IStoppable
  2871. (-stop! [_]
  2872. (go
  2873. (when-not @*stopped?
  2874. (vreset! *stopped? true)
  2875. (ws-stop! *ws)
  2876. (async/untap full-sync-mult private-full-sync-chan)
  2877. (async/untap remote->local-sync-mult private-remote->local-sync-chan)
  2878. (async/untap remote->local-full-sync-mult private-remote->local-full-sync-chan)
  2879. (async/untap pause-resume-mult private-pause-resume-chan)
  2880. (async/untap pubsub/app-wake-up-from-sleep-mult app-awake-from-sleep-chan)
  2881. (when ops-chan (async/close! ops-chan))
  2882. (stop-local->remote! local->remote-syncer)
  2883. (stop-remote->local! remote->local-syncer)
  2884. (<! (<rsapi-cancel-all-requests))
  2885. (swap! *sync-state sync-state--update-state ::stop)
  2886. (reset! current-sm-graph-uuid nil)
  2887. (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path]))))
  2888. IStopped?
  2889. (-stopped? [_]
  2890. @*stopped?))
  2891. (defn sync-manager [user-uuid graph-uuid base-path repo txid *sync-state]
  2892. (let [*txid (atom txid)
  2893. *txid-for-get-deletion-log (atom txid)
  2894. *stopped? (volatile! false)
  2895. *paused? (volatile! false)
  2896. remoteapi-with-stop (->RemoteAPI *stopped?)
  2897. local->remote-syncer (->Local->RemoteSyncer user-uuid graph-uuid
  2898. base-path
  2899. repo *sync-state remoteapi-with-stop
  2900. (if (mobile-util/native-platform?)
  2901. 2000
  2902. 10000)
  2903. *txid *txid-for-get-deletion-log nil (chan) *stopped? *paused?
  2904. (chan 1) (chan 1))
  2905. remote->local-syncer (->Remote->LocalSyncer user-uuid graph-uuid base-path
  2906. repo *txid *txid-for-get-deletion-log *sync-state remoteapi-with-stop
  2907. nil *stopped? *paused?)]
  2908. (.set-remote->local-syncer! local->remote-syncer remote->local-syncer)
  2909. (.set-local->remote-syncer! remote->local-syncer local->remote-syncer)
  2910. (swap! *sync-state sync-state--update-current-syncing-graph-uuid graph-uuid)
  2911. (->SyncManager user-uuid graph-uuid base-path *sync-state local->remote-syncer remote->local-syncer remoteapi-with-stop
  2912. nil *txid *txid-for-get-deletion-log nil nil nil *stopped? *paused? nil nil (chan 1) (chan 1) (chan 1) (chan 1))))
  2913. (defn sync-manager-singleton
  2914. [user-uuid graph-uuid base-path repo txid *sync-state]
  2915. (when-not @current-sm-graph-uuid
  2916. (reset! current-sm-graph-uuid graph-uuid)
  2917. (sync-manager user-uuid graph-uuid base-path repo txid *sync-state)))
  2918. ;; Avoid sync reentrancy
  2919. (defonce *sync-entered? (atom false))
  2920. (defn <sync-stop []
  2921. (go
  2922. (when-let [sm ^SyncManager (state/get-file-sync-manager (state/get-current-file-sync-graph-uuid))]
  2923. (println (str "[SyncManager " (:graph-uuid sm) "]") "stopping")
  2924. (state/clear-file-sync-state! (:graph-uuid sm))
  2925. (<! (-stop! sm))
  2926. (reset! *sync-entered? false)
  2927. (println (str "[SyncManager " (:graph-uuid sm) "]") "stopped"))
  2928. (reset! current-sm-graph-uuid nil)))
  2929. (defn <sync-local->remote-now []
  2930. (go
  2931. (when-let [_sm ^SyncManager (state/get-file-sync-manager (state/get-current-file-sync-graph-uuid))]
  2932. (offer! immediately-local->remote-chan true))))
  2933. (defn sync-need-password!
  2934. []
  2935. (when-let [sm ^SyncManager (state/get-file-sync-manager (state/get-current-file-sync-graph-uuid))]
  2936. (.need-password sm)))
  2937. (defn check-graph-belong-to-current-user
  2938. [current-user-uuid graph-user-uuid]
  2939. (cond
  2940. (nil? current-user-uuid)
  2941. false
  2942. (= current-user-uuid graph-user-uuid)
  2943. true
  2944. :else
  2945. (do (notification/show! (t :file-sync/other-user-graph) :warning false)
  2946. false)))
  2947. (defn <check-remote-graph-exists
  2948. [local-graph-uuid]
  2949. {:pre [(util/uuid-string? local-graph-uuid)]}
  2950. (go
  2951. (let [r (<! (<list-remote-graphs remoteapi))
  2952. result
  2953. (or
  2954. ;; if api call failed, assume this remote graph still exists
  2955. (instance? ExceptionInfo r)
  2956. (and
  2957. (contains? r :Graphs)
  2958. (->> (:Graphs r)
  2959. (mapv :GraphUUID)
  2960. set
  2961. (#(contains? % local-graph-uuid)))))]
  2962. (when-not result
  2963. (notification/show! (t :file-sync/graph-deleted) :warning false))
  2964. result)))
  2965. (defn sync-off?
  2966. [sync-state]
  2967. (or (nil? sync-state) (sync-state--stopped? sync-state)))
  2968. (defn graph-sync-off?
  2969. "Is sync not running for this `graph-uuid`?"
  2970. [graph-uuid]
  2971. (sync-off? (state/get-file-sync-state graph-uuid)))
  2972. (defn graph-encrypted?
  2973. []
  2974. (when-let [graph-uuid (second @graphs-txid)]
  2975. (get-pwd graph-uuid)))
  2976. (declare network-online-cursor)
  2977. (defn <sync-start
  2978. []
  2979. (when-not (false? (state/enable-sync?))
  2980. (go
  2981. (when (false? @*sync-entered?)
  2982. (reset! *sync-entered? true)
  2983. (let [*sync-state (atom (sync-state))
  2984. current-user-uuid (<! (user/<user-uuid))
  2985. ;; put @graph-uuid & get-current-repo together,
  2986. ;; prevent to get older repo dir and current graph-uuid.
  2987. _ (<! (p->c (persist-var/-load graphs-txid)))
  2988. [user-uuid graph-uuid txid] @graphs-txid
  2989. txid (or txid 0)
  2990. repo (state/get-current-repo)]
  2991. (when-not (instance? ExceptionInfo current-user-uuid)
  2992. (when (and repo
  2993. @network-online-cursor
  2994. user-uuid graph-uuid txid
  2995. (graph-sync-off? graph-uuid)
  2996. (user/logged-in?)
  2997. (not (config/demo-graph? repo)))
  2998. (try
  2999. (when-let [sm (sync-manager-singleton current-user-uuid graph-uuid
  3000. (config/get-repo-dir repo) repo
  3001. txid *sync-state)]
  3002. (when (check-graph-belong-to-current-user current-user-uuid user-uuid)
  3003. (if-not (<! (<check-remote-graph-exists graph-uuid)) ; remote graph has been deleted
  3004. (clear-graphs-txid! repo)
  3005. (do
  3006. (state/set-file-sync-state graph-uuid @*sync-state)
  3007. (state/set-file-sync-manager graph-uuid sm)
  3008. ;; update global state when *sync-state changes
  3009. (add-watch *sync-state ::update-global-state
  3010. (fn [_ _ _ n]
  3011. (state/set-file-sync-state graph-uuid n)))
  3012. (state/set-state! [:file-sync/graph-state :current-graph-uuid] graph-uuid)
  3013. (.start sm)
  3014. (offer! remote->local-full-sync-chan true)
  3015. (offer! full-sync-chan true)))))
  3016. (catch :default e
  3017. (prn "Sync start error: ")
  3018. (log/error :exception e)))))
  3019. (reset! *sync-entered? false))))))
  3020. (defn- restart-if-stopped!
  3021. [is-active?]
  3022. (cond
  3023. (and is-active? (graph-sync-off? (second @graphs-txid)))
  3024. (<sync-start)
  3025. :else
  3026. (offer! pause-resume-chan is-active?)))
  3027. (def app-state-changed-cursor (rum/cursor state/state :mobile/app-state-change))
  3028. (def finished-local->remote-chan (chan 1))
  3029. (add-watch app-state-changed-cursor "sync"
  3030. (fn [_ _ _ {:keys [is-active?]}]
  3031. (cond
  3032. (mobile-util/native-android?)
  3033. (when-not is-active?
  3034. (<sync-local->remote-now))
  3035. (mobile-util/native-ios?)
  3036. (let [*task-id (atom nil)]
  3037. (if is-active?
  3038. (restart-if-stopped! is-active?)
  3039. (when (state/get-current-file-sync-graph-uuid)
  3040. (p/let [task-id (.beforeExit ^js BackgroundTask
  3041. (fn []
  3042. (go
  3043. ;; Wait for file watcher events
  3044. (<! (timeout 2000))
  3045. (util/drain-chan finished-local->remote-chan)
  3046. (<! (<sync-local->remote-now))
  3047. ;; wait at most 20s
  3048. (async/alts! [finished-local->remote-chan (timeout 20000)])
  3049. (p/let [active? (mobile-util/app-active?)]
  3050. (when-not active?
  3051. (offer! pause-resume-chan is-active?)))
  3052. (<! (timeout 5000))
  3053. (prn "finish task: " @*task-id)
  3054. (let [opt #js {:taskId @*task-id}]
  3055. (.finish ^js BackgroundTask opt)))))]
  3056. (reset! *task-id task-id)))))
  3057. :else
  3058. nil)))
  3059. ;;; ### some add-watches
  3060. ;; TODO: replace this logic by pause/resume state
  3061. (defonce network-online-cursor (rum/cursor state/state :network/online?))
  3062. (add-watch network-online-cursor "sync-manage"
  3063. (fn [_k _r o n]
  3064. (cond
  3065. (and (true? o) (false? n))
  3066. (<sync-stop)
  3067. (and (false? o) (true? n))
  3068. (<sync-start)
  3069. :else
  3070. nil)))
  3071. (defonce auth-id-token-cursor (rum/cursor state/state :auth/id-token))
  3072. (add-watch auth-id-token-cursor "sync-manage"
  3073. (fn [_k _r _o n]
  3074. (when (nil? n)
  3075. (<sync-stop))))
  3076. ;;; ### some sync events handler
  3077. ;; re-exec remote->local-full-sync when it failed before
  3078. (def re-remote->local-full-sync-chan (chan 1))
  3079. (async/sub pubsub/sync-events-pub :remote->local-full-sync-failed re-remote->local-full-sync-chan)
  3080. (go-loop []
  3081. (let [{{graph-uuid :graph-uuid} :data} (<! re-remote->local-full-sync-chan)
  3082. {:keys [current-syncing-graph-uuid]}
  3083. (state/get-file-sync-state graph-uuid)]
  3084. (when (= graph-uuid current-syncing-graph-uuid)
  3085. (offer! remote->local-full-sync-chan true))
  3086. (recur)))
  3087. ;; re-exec local->remote-full-sync when it failed
  3088. (def re-local->remote-full-sync-chan (chan 1))
  3089. (async/sub pubsub/sync-events-pub :local->remote-full-sync-failed re-local->remote-full-sync-chan)
  3090. (go-loop []
  3091. (let [{{graph-uuid :graph-uuid} :data} (<! re-local->remote-full-sync-chan)
  3092. {:keys [current-syncing-graph-uuid]} (state/get-file-sync-state graph-uuid)]
  3093. (when (= graph-uuid current-syncing-graph-uuid)
  3094. (offer! full-sync-chan true))
  3095. (recur)))
  3096. ;;; add-tap
  3097. (comment
  3098. (def *x (atom nil))
  3099. (add-tap (fn [v] (reset! *x v)))
  3100. )