(ns frontend.fs.sync "Main ns for providing file sync functionality" (:require ["@capawesome/capacitor-background-task" :refer [BackgroundTask]] ["path" :as node-path] [cljs-http.client :as http] [cljs-time.coerce :as tc] [cljs-time.core :as t] [cljs-time.format :as tf] [cljs.core.async :as async :refer [! chan go go-loop offer! poll! timeout]] [cljs.core.async.impl.channels] [cljs.core.async.interop :refer [p->c]] [cljs.spec.alpha :as s] [clojure.pprint :as pp] [clojure.set :as set] [clojure.string :as string] [electron.ipc :as ipc] [frontend.config :as config] [frontend.context.i18n :refer [t]] [frontend.db :as db] [frontend.debug :as debug] [frontend.diff :as diff] [frontend.encrypt :as encrypt] [frontend.fs :as fs] [frontend.fs.capacitor-fs :as capacitor-fs] [frontend.fs.diff-merge :as diff-merge] [frontend.handler.file :as file-handler] [frontend.handler.notification :as notification] [frontend.handler.user :as user] [frontend.mobile.util :as mobile-util] [frontend.pubsub :as pubsub] [frontend.state :as state] [frontend.util :as util] [frontend.util.fs :as fs-util] [frontend.util.persist-var :as persist-var] [goog.string :as gstring] [lambdaisland.glogi :as log] [logseq.common.path :as path] [logseq.graph-parser.util :as gp-util] [medley.core :refer [dedupe-by]] [promesa.core :as p] [rum.core :as rum])) ;;; ### Commentary ;; file-sync related local files/dirs: ;; - logseq/graphs-txid.edn ;; this file contains [user-uuid graph-uuid transaction-id] ;; graph-uuid: the unique identifier of the graph on the server ;; transaction-id: sync progress of local files ;; - logseq/version-files ;; downloaded version-files ;; files included by `get-ignored-files` will not be synchronized. ;; ;; sync strategy: ;; - when toggle file-sync on, ;; trigger remote->local-full-sync first, then local->remote-full-sync ;; local->remote-full-sync will compare local-files with remote-files (by md5), ;; and upload new-added-files to remote server. ;; - if local->remote sync(normal-sync or full-sync) return :need-sync-remote, ;; then trigger a remote->local sync ;; - if remote->local sync return :need-remote->local-full-sync, ;; then we need a remote->local-full-sync, ;; which compare local-files with remote-files, sync diff-remote-files to local ;; - local->remote-full-sync will be triggered after 20mins of idle ;; - every 10s, flush local changes, and sync to remote ;; TODO: use access-token instead of id-token ;; TODO: a remote delete-diff cause local related-file deleted, then trigger a `FileChangeEvent`, ;; and re-produce a new same-file-delete diff. ;;; ### specs (s/def ::state #{;; do following jobs when ::starting: ;; - wait seconds for file-change-events from file-watcher ;; - drop redundant file-change-events ;; - setup states in `frontend.state` ::starting ::need-password ::idle ;; sync local-changed files ::local->remote ;; sync remote latest-transactions ::remote->local ;; local->remote full sync ::local->remote-full-sync ;; remote->local full sync ::remote->local-full-sync ;; snapshot state when switching between apps on iOS ::pause ::stop}) (s/def ::path string?) (s/def ::time t/date?) (s/def ::remote->local-type #{:delete :update ;; :rename=:delete+:update }) (s/def ::current-syncing-graph-uuid (s/or :nil nil? :graph-uuid string?)) (s/def ::recent-remote->local-file-item (s/keys :req-un [::remote->local-type ::checksum ::path])) (s/def ::current-local->remote-files (s/coll-of ::path :kind set?)) (s/def ::current-remote->local-files (s/coll-of ::path :kind set?)) (s/def ::recent-remote->local-files (s/coll-of ::recent-remote->local-file-item :kind set?)) (s/def ::history-item (s/keys :req-un [::path ::time])) (s/def ::history (s/coll-of ::history-item :kind seq?)) (s/def ::sync-state (s/keys :req-un [::current-syncing-graph-uuid ::state ::current-local->remote-files ::current-remote->local-files ::queued-local->remote-files ;; Downloading files from remote will trigger filewatcher events, ;; causes unreasonable information in the content of ::queued-local->remote-files, ;; use ::recent-remote->local-files to filter such events ::recent-remote->local-files ::history])) ;; diff (s/def ::TXId pos-int?) (s/def ::TXType #{"update_files" "delete_files" "rename_file"}) (s/def ::TXContent-to-path string?) (s/def ::TXContent-from-path (s/or :some string? :none nil?)) (s/def ::TXContent-checksum (s/or :some string? :none nil?)) (s/def ::TXContent-item (s/tuple ::TXContent-to-path ::TXContent-from-path ::TXContent-checksum)) (s/def ::TXContent (s/coll-of ::TXContent-item)) (s/def ::diff (s/keys :req-un [::TXId ::TXType ::TXContent])) (s/def ::succ-map #(= {:succ true} %)) (s/def ::unknown-map (comp some? :unknown)) (s/def ::stop-map #(= {:stop true} %)) (s/def ::pause-map #(= {:pause true} %)) (s/def ::need-sync-remote #(= {:need-sync-remote true} %)) (s/def ::graph-has-been-deleted #(= {:graph-has-been-deleted true} %)) (s/def ::sync-local->remote!-result (s/or :stop ::stop-map :succ ::succ-map :pause ::pause-map :need-sync-remote ::need-sync-remote :graph-has-been-deleted ::graph-has-been-deleted :unknown ::unknown-map)) (s/def ::sync-remote->local!-result (s/or :succ ::succ-map :need-remote->local-full-sync #(= {:need-remote->local-full-sync true} %) :stop ::stop-map :pause ::pause-map :unknown ::unknown-map)) (s/def ::sync-local->remote-all-files!-result (s/or :succ ::succ-map :stop ::stop-map :need-sync-remote ::need-sync-remote :graph-has-been-deleted ::graph-has-been-deleted :unknown ::unknown-map)) ;; sync-event type (s/def ::event #{:created-local-version-file :finished-local->remote :finished-remote->local :start :pause :resume :exception-decrypt-failed :remote->local-full-sync-failed :local->remote-full-sync-failed :get-remote-graph-failed :get-deletion-logs-failed }) (s/def ::sync-event (s/keys :req-un [::event ::data])) (defonce download-batch-size 100) (defonce upload-batch-size 20) (def ^:private current-sm-graph-uuid (atom nil)) ;;; ### configs in config.edn ;; - :file-sync/ignore-files (defn- get-ignored-files [] (into #{#"logseq/graphs-txid.edn$" #"logseq/pages-metadata.edn$" #"logseq/version-files/" #"logseq/bak/" #"node_modules/" ;; path starts with `.` in the root directory, e.g. .gitignore #"^\.[^.]+" ;; path includes `/.`, e.g. .git, .DS_store #"/\." ;; Emacs/Vim backup files end with `~` by default #"~$"} (map re-pattern) (:file-sync/ignore-files (state/get-config)))) ;;; ### configs ends (def ws-addr config/WS-URL) ;; Warning: make sure to `persist-var/-load` graphs-txid before using it. (defonce graphs-txid (persist-var/persist-var nil "graphs-txid")) (declare assert-local-txid<=remote-txid) (defn = latest-txid 0)]} (-> (p/let [_ (persist-var/-reset-value! graphs-txid [user-uuid graph-uuid latest-txid] repo) _ (persist-var/persist-save graphs-txid)] (when (state/developer-mode?) (assert-local-txid<=remote-txid))) p->c)) (defn clear-graphs-txid! [repo] (persist-var/-reset-value! graphs-txid nil repo) (persist-var/persist-save graphs-txid)) (defn- ws-ping-loop [ws] (go-loop [] (let [state (.-readyState ws)] ;; not closing or closed state (when (not (contains? #{2 3} state)) (if (not= 1 state) ;; when connecting, wait 1s (do (clj (js/JSON.parse (.-data e)) :keywordize-keys true)] (when (some? (:txid data)) (if-let [v (poll! remote-changes-chan)] (let [last-txid (:txid v) current-txid (:txid data)] (if (> last-txid current-txid) (offer! remote-changes-chan v) (offer! remote-changes-chan data))) (offer! remote-changes-chan data))))))) (defn ws-listen! "return channel which output messages from server" [graph-uuid *ws] (let [remote-changes-chan (chan (async/sliding-buffer 1))] (ws-listen!* graph-uuid *ws remote-changes-chan) remote-changes-chan)) (defn- get-json-body [body] (or (and (not (string? body)) body) (or (string/blank? body) nil) (js->clj (js/JSON.parse body) :keywordize-keys true))) (defn- get-resp-json-body [resp] (-> resp (:body) (get-json-body))) (defn- js body)) :with-credentials? false})] {:resp ( retry-count 5) (throw (js/Error. :file-sync-request)) (do (println "will retry after" (min 60000 (* 1000 retry-count)) "ms") (//path -> path" [path] (let [parts (string/split path "/")] (if (and (< 2 (count parts)) (= 36 (count (parts 0))) (= 36 (count (parts 1)))) (util/string-join-path (drop 2 parts)) path))) (defprotocol IRelativePath (-relative-path [this])) (defn relative-path [o] (let [repo-dir (config/get-repo-dir (state/get-current-repo))] (cond (implements? IRelativePath o) (-relative-path o) ;; full path (and (string? o) (string/starts-with? o repo-dir)) (string/replace o (str repo-dir "/") "") (string? o) (remove-user-graph-uuid-prefix o) :else (throw (js/Error. (str "unsupported type " (str o))))))) (defprotocol IChecksum (-checksum [this])) (defprotocol IStoppable (-stop! [this])) (defprotocol IStopped? (-stopped? [this])) ;from-path, to-path is relative path (deftype FileTxn [from-path to-path updated? deleted? txid checksum] Object (renamed? [_] (not= from-path to-path)) IRelativePath (-relative-path [_] (remove-user-graph-uuid-prefix to-path)) IEquiv (-equiv [_ ^FileTxn other] (and (= from-path (.-from-path other)) (= to-path (.-to-path other)) (= updated? (.-updated? other)) (= deleted? (.-deleted? other)))) IHash (-hash [_] (hash [from-path to-path updated? deleted?])) IComparable (-compare [_ ^FileTxn other] (compare txid (.-txid other))) IPrintWithWriter (-pr-writer [coll w _opts] (write-all w "#FileTxn[\"" from-path "\" -> \"" to-path "\" (updated? " updated? ", renamed? " (.renamed? coll) ", deleted? " deleted? ", txid " txid ", checksum " checksum ")]"))) (defn- assert-filetxns [filetxns] (every? true? (mapv (fn [^FileTxn filetxn] (if (.-updated? filetxn) (some? (-checksum filetxn)) true)) filetxns))) (defn- diff->filetxns "convert diff(`FileTxn (first %) (first %) update? delete? TXId (last %)))) delete-xf (comp (remove #(empty? (first %))) (map #(->FileTxn (first %) (first %) update? delete? TXId nil))) rename-xf (comp (remove #(or (empty? (first %)) (empty? (second %)))) (map #(->FileTxn (second %) (first %) false false TXId nil))) xf (case TXType "delete_files" delete-xf "update_files" update-xf "rename_file" rename-xf)] (sequence xf TXContent))) (defn- distinct-update-filetxns-xf "transducer. remove duplicate update&delete `FileTxn`s." [rf] (let [seen-update&delete-filetxns (volatile! #{})] (fn ([] (rf)) ([result] (rf result)) ([result ^FileTxn filetxn] (if (and (or (.-updated? filetxn) (.-deleted? filetxn)) (contains? @seen-update&delete-filetxns filetxn)) result (do (vswap! seen-update&delete-filetxns conj filetxn) (rf result filetxn))))))) (defn- remove-deleted-filetxns-xf "transducer. remove update&rename filetxns if they are deleted later(in greater txid filetxn)." [rf] (let [seen-deleted-paths (volatile! #{})] (fn ([] (rf)) ([result] (rf result)) ([result ^FileTxn filetxn] (let [to-path (.-to-path filetxn) from-path (.-from-path filetxn)] (if (contains? @seen-deleted-paths to-path) (do (when (not= to-path from-path) (vswap! seen-deleted-paths disj to-path) (vswap! seen-deleted-paths conj from-path)) result) (do (vswap! seen-deleted-paths conj to-path) (rf result filetxn)))))))) (defn- partition-filetxns "return transducer. partition filetxns, at most N update-filetxns in each partition, for delete and rename type, only one filetxn in each partition." [n] (comp (partition-by #(.-updated? ^FileTxn %)) (map (fn [ts] (if (some-> (first ts) (.-updated?)) (partition-all n ts) (map list ts)))) cat)) (defn- contains-path? [regexps path] (reduce #(when (re-find %2 path) (reduced true)) false regexps)) (defn ignored? "Whether file is ignored when syncing." [path] (-> (get-ignored-files) (contains-path? (relative-path path)) (boolean))) (defn- filter-download-files-with-reserved-chars "Skip downloading file paths with reserved chars." [files] (let [f #(and (not (.-deleted? ^js %)) (fs-util/include-reserved-chars? (-relative-path %))) reserved-files (filter f files)] (when (seq reserved-files) (state/pub-event! [:ui/notify-skipped-downloading-files (map -relative-path reserved-files)]) (prn "Skipped downloading those file paths with reserved chars: " (map -relative-path reserved-files))) (remove f files))) (defn- filter-upload-files-with-reserved-chars "Remove upoading file paths with reserved chars." [paths] (let [path-string? (string? (first paths)) f (if path-string? fs-util/include-reserved-chars? #(fs-util/include-reserved-chars? (-relative-path %))) reserved-paths (filter f paths)] (when (seq reserved-paths) (let [paths (if path-string? reserved-paths (map -relative-path reserved-paths))] (when (seq paths) (state/pub-event! [:ui/notify-outdated-filename-format paths])) (prn "Skipped uploading those file paths with reserved chars: " paths))) (vec (remove f paths)))) (defn- diffs->filetxns "transducer. 1. diff -> `FileTxn` , see also `filetxns) cat (remove ignored?) distinct-update-filetxns-xf remove-deleted-filetxns-xf)) (defn- diffs->partitioned-filetxns "partition filetxns, each partition contains same type filetxns, for update type, at most N items in each partition for delete & rename type, only 1 item in each partition." [n] (comp (diffs->filetxns) (partition-filetxns n))) (defn- filepath+checksum->diff [index {:keys [relative-path checksum user-uuid graph-uuid]}] {:post [(s/valid? ::diff %)]} {:TXId (inc index) :TXType "update_files" :TXContent [[(util/string-join-path [user-uuid graph-uuid relative-path]) nil checksum]]}) (defn filepath+checksum-coll->partitioned-filetxns "transducer. 1. filepath+checksum-coll -> diff 2. diffs->partitioned-filetxns 3. filter by config" [n graph-uuid user-uuid] (comp (map (fn [p] {:relative-path (first p) :user-uuid user-uuid :graph-uuid graph-uuid :checksum (second p)})) (map-indexed filepath+checksum->diff) (diffs->partitioned-filetxns n))) (deftype FileMetadata [size etag path encrypted-path last-modified remote? txid ^:mutable normalized-path] Object (get-normalized-path [_] (assert (string? path) path) (when-not normalized-path (set! normalized-path (cond-> path (string/starts-with? path "/") (string/replace-first "/" "") remote? (remove-user-graph-uuid-prefix)))) normalized-path) IRelativePath (-relative-path [_] path) IEquiv (-equiv [o ^FileMetadata other] (and (= (.get-normalized-path o) (.get-normalized-path other)) (= etag (.-etag other)))) IHash (-hash [_] (hash {:etag etag :path path})) ILookup (-lookup [o k] (-lookup o k nil)) (-lookup [_ k not-found] (case k :size size :etag etag :path path :encrypted-path encrypted-path :last-modified last-modified :remote? remote? :txid txid not-found)) IPrintWithWriter (-pr-writer [_ w _opts] (write-all w (str {:size size :etag etag :path path :remote? remote? :txid txid :last-modified last-modified})))) (def ^:private higher-priority-remote-files "when diff all remote files and local files, following remote files always need to download(when checksum not matched), even local-file's last-modified > remote-file's last-modified. because these files will be auto created when the graph created, we dont want them to re-write related remote files." #{"pages/contents.md" "pages/contents.org" "logseq/metadata.edn"}) (def ^:private ignore-default-value-files "when create a new local graph, some files will be created (config.edn, custom.css). And related remote files wins if these files have default template value." #{"logseq/config.edn" "logseq/custom.css"}) (def ^:private empty-custom-css-md5 "d41d8cd98f00b204e9800998ecf8427e") ;; TODO: use fn some to filter FileMetadata here, it cause too much loop (defn diff-file-metadata-sets "Find the `FileMetadata`s that exists in s1 and does not exist in s2, compare by path+checksum+last-modified, if s1.path = s2.path & s1.checksum <> s2.checksum & s1.last-modified > s2.last-modified (except some default created files), keep this `FileMetadata` in result" [s1 s2] (reduce (fn [result item] (let [path (:path item) lower-case-path (some-> path string/lower-case) ;; encrypted-path (:encrypted-path item) checksum (:etag item) last-modified (:last-modified item)] (if (some #(cond (not= lower-case-path (some-> (:path %) string/lower-case)) false (= checksum (:etag %)) true (>= last-modified (:last-modified %)) false ;; these special files have higher priority in s1 (contains? higher-priority-remote-files path) false ;; higher priority in s1 when config.edn=default value or empty custom.css (and (contains? ignore-default-value-files path) (#{config/config-default-content-md5 empty-custom-css-md5} (:etag %))) false (< last-modified (:last-modified %)) true) s2) result (conj result item)))) #{} s1)) (comment (defn map->FileMetadata [m] (apply ->FileMetadata ((juxt :size :etag :path :encrypted-path :last-modified :remote? (constantly nil)) m))) (assert (= #{(map->FileMetadata {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2})} (diff-file-metadata-sets (into #{} (map map->FileMetadata) [{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1} {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2}]) (into #{} (map map->FileMetadata) [{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1} {:size 1 :etag 1 :path 2 :encrypted-path 2 :last-modified 1}]))))) (extend-protocol IChecksum FileMetadata (-checksum [this] (.-etag this)) FileTxn (-checksum [this] (.-checksum this))) (defn- sort-file-metadata-fn ":recent-days-range > :favorite-pages > small-size pages > ... :recent-days-range : [ ] " [& {:keys [recent-days-range favorite-pages]}] {:pre [(or (nil? recent-days-range) (every? number? recent-days-range))]} (let [favorite-pages* (set favorite-pages)] (fn [^FileMetadata item] (let [path (relative-path item) journal-dir (node-path/join (config/get-journals-directory) node-path/sep) journal? (string/starts-with? path journal-dir) journal-day (when journal? (try (tc/to-long (tf/parse (tf/formatter "yyyy_MM_dd") (-> path (string/replace-first journal-dir "") (string/replace-first ".md" "")))) (catch :default _)))] (cond (and recent-days-range journal-day (<= (first recent-days-range) ^number journal-day (second recent-days-range))) journal-day (string/includes? path "logseq/") 9999 (string/includes? path "content.") 10000 (contains? favorite-pages* path) (count path) :else (- (.-size item))))))) ;;; ### path-normalize (def path-normalize gp-util/path-normalize) ;;; ### APIs ;; `RSAPI` call apis through rsapi package, supports operations on files (defprotocol IRSAPI (rsapi-ready? [this graph-uuid] "return true when rsapi ready") ( local") ( local version-db") ( remote, return err or txid") ( r first :path (not= filepath)) (-> r first :path))))) (defn n 0)) (do (print (str "retry(" n ") ...")) (recur (dec n))) r)))) (declare clj r) result #{}] (if-not (and path metadata) ;; finish result (let [normalized-path (path-normalize path) encryptedFname (if (not= path normalized-path) (first (FileMetadata (get metadata "size") (get metadata "md5") normalized-path encryptedFname (get metadata "mtime") false nil nil))))))) (deftype RSAPI [^:mutable graph-uuid' ^:mutable private-key' ^:mutable public-key'] IToken (clj (c (ipc/ipc "key-gen"))) :keywordize-keys true))) (c (ipc/ipc "set-env" graph-uuid (if prod? "prod" "dev") private-key public-key))) (c (ipc/ipc "get-local-all-files-meta" graph-uuid base-path))))] (if (instance? ExceptionInfo r) r (c (ipc/ipc "get-local-files-meta" graph-uuid base-path filepaths))))] (assert (not (instance? ExceptionInfo r)) "get-local-files-meta shouldn't return exception") (c (ipc/ipc "rename-local-file" graph-uuid base-path (path-normalize from) (path-normalize to))))) (c (ipc/ipc "update-local-files" graph-uuid base-path filepaths token)))))) (c (ipc/ipc "fetch-remote-files" graph-uuid base-path filepaths token)))))) (c (ipc/ipc "download-version-files" graph-uuid base-path filepaths token))))] r))) (c (ipc/ipc "delete-local-files" graph-uuid base-path normalized-filepaths))))] r)))) (c (ipc/ipc "update-remote-files" graph-uuid base-path normalized-filepaths local-txid token)))))))) (c (ipc/ipc "delete-remote-files" graph-uuid base-path normalized-filepaths local-txid token)))))))) (clj (c (ipc/ipc "encrypt-fnames" graph-uuid fnames)))))) (c (ipc/ipc "decrypt-fnames" graph-uuid fnames)))] (if (instance? ExceptionInfo r) (ex-info "decrypt-failed" {:fnames fnames} (ex-cause r)) (js->clj r))))) (c (ipc/ipc "cancel-all-requests"))) (c (ipc/ipc "addVersionFile" (config/get-local-dir repo) path content)))) (deftype ^:large-vars/cleanup-todo CapacitorAPI [^:mutable graph-uuid' ^:mutable private-key ^:mutable public-key'] IToken (c (.keygen mobile-util/file-sync #js {})))] (-> r (js->clj :keywordize-keys true))))) (c (.setEnv mobile-util/file-sync (clj->js {:graphUUID graph-uuid :env (if prod? "prod" "dev") :secretKey secret-key :publicKey public-key})))) (c (.getLocalAllFilesMeta mobile-util/file-sync (clj->js {:graphUUID graph-uuid :basePath base-path}))))] (if (instance? ExceptionInfo r) r (c (.getLocalFilesMeta mobile-util/file-sync (clj->js {:graphUUID graph-uuid :basePath base-path :filePaths filepaths}))))] (assert (not (instance? ExceptionInfo r)) "get-local-files-meta shouldn't return exception") (c (.renameLocalFile mobile-util/file-sync (clj->js {:graphUUID graph-uuid :basePath base-path :from (path-normalize from) :to (path-normalize to)})))) (c (.updateLocalFiles mobile-util/file-sync (clj->js {:graphUUID graph-uuid :basePath base-path :filePaths filepaths' :token token}))))))) (c (.fetchRemoteFiles mobile-util/file-sync (clj->js {:graphUUID graph-uuid :basePath base-path :filePaths filepaths :token token})))))] (js->clj (.-value r))))) (c (.updateLocalVersionFiles mobile-util/file-sync (clj->js {:graphUUID graph-uuid :basePath base-path :filePaths filepaths :token token})))))] r))) (c (.deleteLocalFiles mobile-util/file-sync (clj->js {:graphUUID graph-uuid :basePath base-path :filePaths normalized-filepaths})))))] r)))) (c (.updateRemoteFiles mobile-util/file-sync (clj->js {:graphUUID graph-uuid :basePath base-path :filePaths normalized-filepaths :txid local-txid :token token :fnameEncryption true}))))] (if (instance? ExceptionInfo r) r (get (js->clj r) "txid")))))) (c (.deleteRemoteFiles mobile-util/file-sync (clj->js {:graphUUID graph-uuid :basePath base-path :filePaths normalized-filepaths :txid local-txid :token token}))))] (if (instance? ExceptionInfo r) r (get (js->clj r) "txid")))))) (c (.encryptFnames mobile-util/file-sync (clj->js {:graphUUID graph-uuid :filePaths fnames}))))] (if (instance? ExceptionInfo r) (.-cause r) (get (js->clj r) "value"))))) (c (.decryptFnames mobile-util/file-sync (clj->js {:graphUUID graph-uuid :filePaths fnames}))))] (if (instance? ExceptionInfo r) (ex-info "decrypt-failed" {:fnames fnames} (ex-cause r)) (get (js->clj r) "value"))))) (c (.cancelAllRequests mobile-util/file-sync))) (c (capacitor-fs/backup-file repo :version-file-dir path content)))) (def rsapi (cond (util/electron?) (->RSAPI nil nil nil) (mobile-util/native-ios?) (->CapacitorAPI nil nil nil) (mobile-util/native-android?) (->CapacitorAPI nil nil nil) :else nil)) (defn add-new-version-file [repo path content] ( (ex-data exp) :err (= :stop))) (defn storage-exceed-limit? [exp] (some->> (ex-data exp) :err ((juxt :status (comp :message :body))) ((fn [[status msg]] (and (= 403 status) (= msg "storage-limit")))))) (defn graph-count-exceed-limit? [exp] (some->> (ex-data exp) :err ((juxt :status (comp :message :body))) ((fn [[status msg]] (and (= 403 status) (= msg "graph-count-exceed-limit")))))) (defn decrypt-exp? [exp] (some-> exp ex-message #(= % "decrypt-failed"))) ;;; remote api exceptions ends ;;; ### sync events (defn- put-sync-event! [val] (async/put! pubsub/sync-events-ch val)) (def ^:private debug-print-sync-events-loop-stop-chan (chan 1)) (defn debug-print-sync-events-loop ([] (debug-print-sync-events-loop [:created-local-version-file :finished-local->remote :finished-remote->local :pause :resume :exception-decrypt-failed :remote->local-full-sync-failed :local->remote-full-sync-failed])) ([topics] (util/drain-chan debug-print-sync-events-loop-stop-chan) (let [topic&chs (map (juxt identity #(chan 10)) topics) out-ch (chan 10) out-mix (async/mix out-ch)] (doseq [[topic ch] topic&chs] (async/sub pubsub/sync-events-pub topic ch) (async/admix out-mix ch)) (go-loop [] (let [{:keys [val stop]} (async/alt! debug-print-sync-events-loop-stop-chan {:stop true} out-ch ([v] {:val v}))] (cond stop (do (async/unmix-all out-mix) (doseq [[topic ch] topic&chs] (async/unsub pubsub/sync-events-pub topic ch))) val (do (pp/pprint [:debug :sync-event val]) (recur)))))))) (defn stop-debug-print-sync-events-loop [] (offer! debug-print-sync-events-loop-stop-chan true)) ;;; sync events ends (defn- fire-file-sync-storage-exceed-limit-event! [exp] (when (storage-exceed-limit? exp) (state/pub-event! [:file-sync/storage-exceed-limit]) true)) (defn- fire-file-sync-graph-count-exceed-limit-event! [exp] (when (graph-count-exceed-limit? exp) (state/pub-event! [:file-sync/graph-count-exceed-limit]) true)) (deftype RemoteAPI [*stopped?] Object (path-map] (let [path->encrypted-path-map (set/map-invert encrypted-path->path-map) raw-paths (vals encrypted-path->path-map) *encrypted-paths-to-drop (transient [])] (loop [[raw-path & other-paths] raw-paths] (when raw-path (let [normalized-path (path-normalize raw-path)] (when (not= normalized-path raw-path) (println :filter-files-with-unnormalized-path raw-path) (conj! *encrypted-paths-to-drop (get path->encrypted-path-map raw-path)))) (recur other-paths))) (let [encrypted-paths-to-drop (set (persistent! *encrypted-paths-to-drop))] (filterv #(not (contains? encrypted-paths-to-drop (:encrypted-path %))) file-meta-list)))) (defn- filter-case-different-same-files "filter case-different-but-same-name files, last-modified one wins" [file-meta-list encrypted-path->path-map] (let [seen (volatile! {})] (loop [result-file-meta-list (transient {}) [f & others] file-meta-list] (if f (let [origin-path (get encrypted-path->path-map (:encrypted-path f)) _ (assert (some? origin-path) f) path (string/lower-case origin-path) last-modified (:last-modified f) last-modified-seen (get @seen path)] (cond (or (and path (nil? last-modified-seen)) (and path (some? last-modified-seen) (> last-modified last-modified-seen))) ;; 1. not found in seen ;; 2. found in seen, but current f wins (do (vswap! seen conj [path last-modified]) (recur (conj! result-file-meta-list [path f]) others)) (and path (some? last-modified-seen) (<= last-modified last-modified-seen)) ;; found in seen, and seen-f has more recent last-modified epoch (recur result-file-meta-list others) :else (do (println :debug-filter-case-different-same-files:unreachable f path) (recur result-file-meta-list others)))) (vals (persistent! result-file-meta-list)))))) (extend-type RemoteAPI IRemoteAPI (path-map (zipmap encrypted-path-list* path-list-or-exp)] (set (mapv #(->FileMetadata (:size %) (:checksum %) (get encrypted-path->path-map (:encrypted-path %)) (:encrypted-path %) (:last-modified %) true (:txid %) nil) (-> file-meta-list* (filter-files-with-unnormalized-path encrypted-path->path-map) (filter-case-different-same-files encrypted-path->path-map))))))))))) (path-map (zipmap encrypted-paths paths-or-exp)] (into #{} (comp (filter #(not= "filepath too long" (:Error %))) (map #(->FileMetadata (:Size %) (:Checksum %) (some->> (get encrypted-path->path-map (:FilePath %)) path-normalize) (:FilePath %) (:LastModified %) true (:Txid %) nil))) r)))))))) ( {} (seq graph-name-opt) (assoc :GraphName graph-name-opt) (seq graph-uuid-opt) (assoc :GraphUUID graph-uuid-opt)))))) (path-map (zipmap encrypted-paths (path-map %) (:paths txn)))) txns-with-encrypted-paths)] txns))))) ( from-path remove-user-graph-uuid-prefix) checksum]) (:TXContent txn)))) txns-with-encrypted-paths) encrypted-paths (mapcat (fn [txn] (remove #(or (nil? %) (not (string/starts-with? % "e."))) (mapcat (fn [[to-path from-path _checksum]] [to-path from-path]) (:TXContent txn)))) txns-with-encrypted-paths*) encrypted-path->path-map (zipmap encrypted-paths (path-map to-path to-path) (some->> from-path (get encrypted-path->path-map)) checksum]) (:TXContent txn)))) txns-with-encrypted-paths*)] [txns (:TXId (last txns)) (:TXId (first txns))]))))) (RemoteAPI nil)) (def ^:private *get-graph-salt-memoize-cache (atom {})) (defn update-graph-salt-cache [graph-uuid v] {:pre [(map? v) (= #{:value :expired-at} (set (keys v)))]} (swap! *get-graph-salt-memoize-cache conj [graph-uuid v])) (defn c (-> (fs/file-exists? repo-dir rpath) (p/then (fn [exists?] (when exists? (fs/read-file repo-dir rpath)))))))] (and (seq origin-db-content) (or (nil? content) (some :removed (diff/diff origin-db-content content)))))))) (defn- remote-files sync-state--add-current-remote->local-files sync-state--remove-current-local->remote-files sync-state--remove-current-remote->local-files sync-state--add-recent-remote->local-files sync-state--remove-recent-remote->local-files sync-state--stopped?) (defn- filetxns=>recent-remote->local-files [filetxns] (let [{:keys [update-filetxns delete-filetxns rename-filetxns]} (group-by (fn [^FileTxn e] (cond (.-updated? e) :update-filetxns (.-deleted? e) :delete-filetxns (.renamed? e) :rename-filetxns)) filetxns) update-file-items (map (fn [filetxn] (let [path (relative-path filetxn)] {:remote->local-type :update :checksum (-checksum filetxn) :path path})) update-filetxns) rename-file-items (mapcat (fn [^FileTxn filetxn] (let [to-path (relative-path filetxn) from-path (.-from-path filetxn)] [{:remote->local-type :update :checksum (-checksum filetxn) :path to-path} {:remote->local-type :delete :checksum nil :path from-path}])) rename-filetxns) delete-file-items (map (fn [filetxn] (let [path (relative-path filetxn)] {:remote->local-type :delete :checksum (-checksum filetxn) :path path})) delete-filetxns)] (set (concat update-file-items rename-file-items delete-file-items)))) (defn- c (p/all (->> relative-paths (map (fn [rpath] (prn ::handle-remote-deletion rpath) (p/let [base-file (path/path-join "logseq/version-files/base" rpath) current-change-file rpath format (gp-util/get-format current-change-file) repo (state/get-current-repo) repo-dir (config/get-repo-dir repo) base-exists? (fs/file-exists? repo-dir base-file)] (if base-exists? (p/let [base-content (fs/read-file repo-dir base-file) current-content (-> (fs/read-file repo-dir current-change-file) (p/catch (fn [_] nil)))] (if (= base-content current-content) ;; base-content == current-content, delete current-change-file (p/do! (c (p/all (->> fetched-file-rpaths (map (fn [rpath] (p/let [incoming-file (path/path-join "logseq/version-files/incoming" rpath) base-file (path/path-join "logseq/version-files/base" rpath) current-change-file rpath format (gp-util/get-format current-change-file) repo (state/get-current-repo) repo-dir (config/get-repo-dir repo) base-exists? (fs/file-exists? repo-dir base-file) _ (prn ::base-ex base-exists?)] (cond base-exists? (p/let [base-content (fs/read-file repo-dir base-file) current-content (-> (fs/read-file repo-dir current-change-file) (p/catch (fn [_] nil)))] (if (= base-content current-content) (do (prn "base=current, write directly") (p/do! (fs/copy! repo (path/path-join repo-dir incoming-file) (path/path-join repo-dir current-change-file)) (fs/copy! repo (path/path-join repo-dir incoming-file) (path/path-join repo-dir base-file)))) (do (prn "base!=current, should do a 3-way merge") (prn ::cur current-content) (p/let [current-content (or current-content "") incoming-content (fs/read-file repo-dir incoming-file) merged-content (diff-merge/three-way-merge base-content incoming-content current-content format)] (prn ::merged-content merged-content) (when (seq merged-content) (p/do! (fs/write-file! repo repo-dir current-change-file merged-content {:skip-compare? true}) (file-handler/alter-file repo current-change-file merged-content {:re-render-root? true :from-disk? true :fs/event :fs/remote-file-change}))) ;; now, let fs watcher handle the rest uploading (comment fs/copy! repo-url (path/path-join repo-dir incoming-file) (path/path-join repo-dir current-change-file)))))) :else (do (prn "no base, use empty content as base, avoid loosing data") (p/let [current-content (-> (fs/read-file repo-dir current-change-file) (p/catch (fn [_] nil))) current-content (or current-content "") incoming-content (fs/read-file repo-dir incoming-file) merged-content (diff-merge/three-way-merge "" current-content incoming-content format)] (if (= incoming-content merged-content) (p/do! (fs/copy! repo (path/path-join repo-dir incoming-file) (path/path-join repo-dir current-change-file)) (fs/copy! repo (path/path-join repo-dir incoming-file) (path/path-join repo-dir base-file))) ;; else (p/do! (fs/write-file! repo repo-dir current-change-file merged-content {:skip-compare? true}) (file-handler/alter-file repo current-change-file merged-content {:re-render-root? true :from-disk? true :fs/event :fs/remote-file-change}))))))))))))))) (defn- apply-filetxns [*sync-state graph-uuid base-path filetxns *paused] (go (cond (.renamed? (first filetxns)) (let [^FileTxn filetxn (first filetxns) from-path (.-from-path filetxn) to-path (.-to-path filetxn)] (assert (= 1 (count filetxns))) (db-content-vec (->> filetxns (mapv #(when (is-journals-or-pages? %) [% (db/get-file repo (relative-path %))])) (remove nil?))] (doseq [relative-p (map relative-path filetxns)] (when-some [relative-p* (local-file-item {:remote->local-type :delete :checksum nil :path relative-p*}] (println :debug "found case-different-same-local-file" relative-p relative-p*) (swap! *sync-state sync-state--add-recent-remote->local-files [recent-remote->local-file-item]) (local-files [recent-remote->local-file-item]))))) (let [update-local-files-ch (db-content-vec] (when (local-files) (defn apply-filetxns-partitions "won't call local-file-items (filetxns=>recent-remote->local-files filetxns) _ (when-not full-sync? (swap! *sync-state #(sync-state-reset-full-remote->local-files % recent-remote->local-file-items))) ;; update recent-remote->local-files _ (swap! *sync-state sync-state--add-recent-remote->local-files recent-remote->local-file-items) _ (swap! *sync-state sync-state--add-current-remote->local-files paths) r (local-files paths (not (instance? ExceptionInfo r)))] ;; remove these recent-remote->local-file-items 5s later (go (local-files recent-remote->local-file-items)) (cond (instance? ExceptionInfo r) r @*paused {:pause true} :else (let [latest-txid (apply max (and *txid @*txid) (map #(.-txid ^FileTxn %) filetxns))] ;; update local-txid (when (and *txid (number? latest-txid)) (reset! *txid latest-txid) (local-syncer]] (let [remote-txid txid local-txid (.-txid remote->local-syncer)] (or (nil? local-txid) (> remote-txid local-txid)))) (defmethod need-sync-remote? :exceptional-response [resp] (let [data (ex-data resp) cause (ex-cause resp)] (or (and (= (:error data) :promise-error) (when-let [r (re-find #"(\d+), txid_to_validate = (\d+)" (str cause))] (> (nth r 1) (nth r 2)))) (= 409 (get-in data [:err :status]))))) (defmethod need-sync-remote? :chan [c] (go (need-sync-remote? (> (ex-cause r) str (re-find #"graph-not-exist"))) (defn- stop-sync-by-rsapi-response? [r] (some->> (ex-cause r) str (re-find #"Request is not yet valid"))) ;; type = "change" | "add" | "unlink" (deftype FileChangeEvent [type dir path stat checksum] IRelativePath (-relative-path [_] (remove-dir-prefix dir path)) IEquiv (-equiv [_ ^FileChangeEvent other] (and (= dir (.-dir other)) (= type (.-type other)) (= path (.-path other)) (= checksum (.-checksum other)))) IHash (-hash [_] (hash {:dir dir :type type :path path :checksum checksum})) ILookup (-lookup [o k] (-lookup o k nil)) (-lookup [_ k not-found] (case k :type type :dir dir :path path :stat stat :checksum checksum not-found)) IPrintWithWriter (-pr-writer [_ w _opts] (write-all w (str {:type type :base-path dir :path path :size (:size stat) :checksum checksum})))) (defn- recent-remote->local-file-item "return nil when related local files not found" [graph-uuid ^FileChangeEvent e] (go (let [tp (case (.-type e) ("add" "change") :update "unlink" :delete) path (relative-path e)] (when-let [path-etag-entry (first (local-type tp :checksum (if (= tp :delete) nil (val path-etag-entry)) :path path})))) (defn- distinct-file-change-events-xf "transducer. distinct `FileChangeEvent`s by their path, keep the first one." [rf] (let [seen (volatile! #{})] (fn ([] (rf)) ([result] (rf result)) ([result ^FileChangeEvent e] (if (contains? @seen (.-path e)) result (do (vswap! seen conj (.-path e)) (rf result e))))))) (defn- distinct-file-change-events "distinct `FileChangeEvent`s by their path, keep the last one." [es] (transduce distinct-file-change-events-xf conj '() (reverse es))) (defn- partition-file-change-events "return transducer. partition `FileChangeEvent`s, at most N file-change-events in each partition. only one type in a partition." [n] (comp (partition-by (fn [^FileChangeEvent e] (case (.-type e) ("add" "change") :add-or-change "unlink" :unlink))) (map #(partition-all n %)) cat)) (declare sync-state--valid-to-accept-filewatcher-event?) (defonce local-changes-chan (chan (async/dropping-buffer 1000))) (defn file-watch-handler "file-watcher callback" [type {:keys [dir path _content stat] :as _payload}] (when-let [current-graph (state/get-current-repo)] (when (string/ends-with? current-graph dir) (when-let [sync-state (state/get-file-sync-state (state/get-current-file-sync-graph-uuid))] (when (sync-state--valid-to-accept-filewatcher-event? sync-state) (when (or (:mtime stat) (= type "unlink")) (go (let [path (path-normalize path) files-meta (and (not= "unlink" type) ( files-meta first :etag))] (>! local-changes-chan (->FileChangeEvent type dir path stat checksum)))))))))) (defn local-changes-revised-chan-builder "return chan" [local-changes-chan rename-page-event-chan] (let [*rename-events (atom #{}) ch (chan 1000)] (go-loop [] (let [{:keys [rename-event local-change]} (async/alt! rename-page-event-chan ([v] {:rename-event v}) ;; {:repo X :old-path X :new-path} local-changes-chan ([v] {:local-change v}))] (cond rename-event (let [repo-dir (config/get-repo-dir (:repo rename-event)) remove-dir-prefix-fn #(remove-dir-prefix repo-dir %) rename-event* (-> rename-event (update :old-path remove-dir-prefix-fn) (update :new-path remove-dir-prefix-fn)) k1 [:old-path (:old-path rename-event*) repo-dir] k2 [:new-path (:new-path rename-event*) repo-dir]] (swap! *rename-events conj k1 k2) ;; remove rename-events after 2s (go (! ch (->FileChangeEvent "unlink" repo-dir (:old-path rename-event*) nil nil)) (>! ch (->FileChangeEvent "add" repo-dir (:new-path rename-event*) {:mtime (tc/to-long (t/now)) :size 1 ; add a fake size } "fake-checksum")) (recur)) local-change (cond (and (= "change" (.-type local-change)) (or (contains? @*rename-events [:old-path (.-path local-change) (.-dir local-change)]) (contains? @*rename-events [:new-path (.-path local-change) (.-dir local-change)]))) (do (println :debug "ignore" local-change) ;; ignore (recur)) (and (= "add" (.-type local-change)) (contains? @*rename-events [:new-path (.-path local-change) (.-dir local-change)])) ;; ignore (do (println :debug "ignore" local-change) (recur)) (and (= "unlink" (.-type local-change)) (contains? @*rename-events [:old-path (.-path local-change) (.-dir local-change)])) (do (println :debug "ignore" local-change) (recur)) :else (do (>! ch local-change) (recur)))))) ch)) (defonce local-changes-revised-chan (local-changes-revised-chan-builder local-changes-chan (state/get-file-rename-event-chan))) ;;; ### encryption (def pwd-map "graph-uuid->{:pwd xxx :public-key xxx :private-key xxx}" (atom {})) (defonce *pwd-map-changed-chan (atom {})) (defn- get-graph-pwd-changed-chan [graph-uuid] (if-let [result (get @*pwd-map-changed-chan graph-uuid)] result (let [c (chan (async/sliding-buffer 1))] (swap! *pwd-map-changed-chan assoc graph-uuid c) c))) (defn- c (encrypt/encrypt-with-passphrase key* content))) (defn- decrypt-content [encrypted-content key*] (go (let [r (c (encrypt/decrypt-with-passphrase key* encrypted-content)))] (when-not (instance? ExceptionInfo r) r)))) (defn- local-storage-pwd-path [graph-uuid] (str "encrypted-pwd/" graph-uuid)) (defn- persist-pwd! [pwd graph-uuid] {:pre [(string? pwd)]} (js/localStorage.setItem (local-storage-pwd-path graph-uuid) pwd)) (defn- remove-pwd! [graph-uuid] (js/localStorage.removeItem (local-storage-pwd-path graph-uuid))) (defn get-pwd [graph-uuid] (js/localStorage.getItem (local-storage-pwd-path graph-uuid))) (defn remove-all-pwd! [] (doseq [k (filter #(string/starts-with? % "encrypted-pwd/") (js->clj (js-keys js/localStorage)))] (js/localStorage.removeItem k)) (reset! pwd-map {})) (defn encrypt+persist-pwd! "- persist encrypted pwd at local-storage" [pwd graph-uuid] (go (let [[value expired-at gone?] ((juxt :value :expired-at #(-> % ex-data :err :status (= 410))) ( % ex-data :err :status (= 410))) ( (ex-data r) :err :status (= 404)) pwd (remote full sync" (chan 1)) (def full-sync-mult (async/mult full-sync-chan)) (def remote->local-sync-chan "offer `true` to this chan will trigger a remote->local sync" (chan 1)) (def remote->local-sync-mult (async/mult remote->local-sync-chan)) (def remote->local-full-sync-chan "offer `true` to this chan will trigger a remote->local full sync" (chan 1)) (def remote->local-full-sync-mult (async/mult remote->local-full-sync-chan)) (def immediately-local->remote-chan "Immediately trigger upload of files in waiting queue" (chan)) (def immediately-local->remote-mult (async/mult immediately-local->remote-chan)) (def pause-resume-chan "false -> pause, true -> resume. see also `*resume-state`" (chan 1)) (def pause-resume-mult (async/mult pause-resume-chan)) (def recent-edited-chan "Triggered when there is content editing" (chan 1)) (def recent-edited-mult (async/mult recent-edited-chan)) (def last-input-time-cursor (rum/cursor state/state :editor/last-input-time)) (add-watch last-input-time-cursor "sync" (fn [_ _ _ _] (offer! recent-edited-chan true))) ;;; ### sync state (def *resume-state "key: graph-uuid" (atom {})) (defn resume-state--add-remote->local-state [graph-uuid] (swap! *resume-state assoc graph-uuid {:remote->local true})) (defn resume-state--add-remote->local-full-sync-state [graph-uuid] (swap! *resume-state assoc graph-uuid {:remote->local-full-sync true})) (defn resume-state--add-local->remote-state [graph-uuid local-changes] (swap! *resume-state assoc graph-uuid {:local->remote local-changes})) ;; (defn resume-state--add-local->remote-full-sync-state ;; [graph-uuid] ;; (swap! *resume-state assoc graph-uuid {:local->remote-full-sync true})) (defn resume-state--reset [graph-uuid] (swap! *resume-state dissoc graph-uuid)) (defn sync-state "create a new sync-state" [] {:post [(s/valid? ::sync-state %)]} {:current-syncing-graph-uuid nil :state ::starting :full-local->remote-files #{} :current-local->remote-files #{} :full-remote->local-files #{} :current-remote->local-files #{} :queued-local->remote-files #{} :recent-remote->local-files #{} :history '()}) (defn- sync-state--update-current-syncing-graph-uuid [sync-state graph-uuid] {:pre [(s/valid? ::sync-state sync-state)] :post [(s/valid? ::sync-state %)]} (assoc sync-state :current-syncing-graph-uuid graph-uuid)) (defn- sync-state--update-state [sync-state next-state] {:pre [(s/valid? ::state next-state)] :post [(s/valid? ::sync-state %)]} (assoc sync-state :state next-state)) (defn sync-state--add-current-remote->local-files [sync-state paths] {:post [(s/valid? ::sync-state %)]} (update sync-state :current-remote->local-files into paths)) (defn sync-state--add-current-local->remote-files [sync-state paths] {:post [(s/valid? ::sync-state %)]} (update sync-state :current-local->remote-files into paths)) (defn sync-state--add-queued-local->remote-files [sync-state event] {:post [(s/valid? ::sync-state %)]} (update sync-state :queued-local->remote-files (fn [o event] (->> (concat o [event]) (util/distinct-by-last-wins (fn [e] (.-path e))))) event)) (defn sync-state--remove-queued-local->remote-files [sync-state event] {:post [(s/valid? ::sync-state %)]} (update sync-state :queued-local->remote-files (fn [o event] (remove #{event} o)) event)) (defn sync-state-reset-queued-local->remote-files [sync-state] {:post [(s/valid? ::sync-state %)]} (assoc sync-state :queued-local->remote-files nil)) (defn sync-state--add-recent-remote->local-files [sync-state items] {:pre [(s/valid? (s/coll-of ::recent-remote->local-file-item) items)] :post [(s/valid? ::sync-state %)]} (update sync-state :recent-remote->local-files (partial apply conj) items)) (defn sync-state--remove-recent-remote->local-files [sync-state items] {:post [(s/valid? ::sync-state %)]} (update sync-state :recent-remote->local-files set/difference items)) (defn sync-state-reset-full-local->remote-files [sync-state events] {:post [(s/valid? ::sync-state %)]} (assoc sync-state :full-local->remote-files events)) (defn sync-state-reset-full-remote->local-files [sync-state events] {:post [(s/valid? ::sync-state %)]} (assoc sync-state :full-remote->local-files events)) (defn- add-history-items [history paths now] (sequence (comp ;; only reserve the latest one of same-path-items (dedupe-by :path) ;; reserve the latest 20 history items (take 20)) (into (filter (fn [o] (not (contains? (set paths) (:path o)))) history) (map (fn [path] {:path path :time now}) paths)))) (defn sync-state--remove-current-remote->local-files [sync-state paths add-history?] {:post [(s/valid? ::sync-state %)]} (let [now (t/now)] (cond-> sync-state true (update :current-remote->local-files set/difference paths) add-history? (update :history add-history-items paths now)))) (defn sync-state--remove-current-local->remote-files [sync-state paths add-history?] {:post [(s/valid? ::sync-state %)]} (let [now (t/now)] (cond-> sync-state true (update :current-local->remote-files set/difference paths) add-history? (update :history add-history-items paths now)))) (defn sync-state--stopped? "Graph syncing is stopped" [sync-state] {:pre [(s/valid? ::sync-state sync-state)]} (= ::stop (:state sync-state))) (defn sync-state--valid-to-accept-filewatcher-event? [sync-state] {:pre [(s/valid? ::sync-state sync-state)]} (contains? #{::idle ::local->remote ::remote->local ::local->remote-full-sync ::remote->local-full-sync} (:state sync-state))) ;;; ### remote->local syncer & local->remote syncer (defprotocol IRemote->LocalSync (stop-remote->local! [this]) (local! [this] "return ExceptionInfo when error occurs") (local-all-files! [this] "sync all files, return ExceptionInfo when error occurs")) (defprotocol ILocal->RemoteSync (setup-local->remote! [this]) (stop-local->remote! [this]) (remote! [this es] "es is a sequence of `FileChangeEvent`, all items have same type.") (remote-all-files! [this] "compare all local files to remote ones, sync when not equal. if local-txid != remote-txid, return {:need-sync-remote true}")) (defrecord ^:large-vars/cleanup-todo Remote->LocalSyncer [user-uuid graph-uuid base-path repo *txid *txid-for-get-deletion-log *sync-state remoteapi ^:mutable local->remote-syncer *stopped *paused] Object (set-local->remote-syncer! [_ s] (set! local->remote-syncer s)) (sync-files-remote->local! [_ relative-filepath+checksum-coll latest-txid] (go (let [partitioned-filetxns (sequence (filepath+checksum-coll->partitioned-filetxns download-batch-size graph-uuid user-uuid) relative-filepath+checksum-coll) r (if (empty? (flatten partitioned-filetxns)) {:succ true} (do (put-sync-event! {:event :start :data {:type :full-remote->local :graph-uuid graph-uuid :full-sync? true :epoch (tc/to-epoch (t/now))}}) (local-files % [])) (LocalSync (stop-remote->local! [_] (vreset! *stopped true)) (local! [_] (go (let [r (let [diff-r ( (dec min-txid) @*txid) ;; min-txid-1 > @*txid, need to remote->local-full-sync (do (println "min-txid" min-txid "request-txid" @*txid) {:need-remote->local-full-sync true}) (when (pos-int? latest-txid) (let [filtered-diff-txns (-> (transduce (diffs->filetxns) conj '() (reverse diff-txns)) filter-download-files-with-reserved-chars) partitioned-filetxns (transduce (partition-filetxns download-batch-size) (completing (fn [r i] (conj r (reverse i)))) ;reverse '() filtered-diff-txns)] (put-sync-event! {:event :start :data {:type :remote->local :graph-uuid graph-uuid :full-sync? false :epoch (tc/to-epoch (t/now))}}) (if (empty? (flatten partitioned-filetxns)) (do (swap! *sync-state #(sync-state-reset-full-remote->local-files % [])) (local-full-sync r) r :else {:succ true})))) (local-all-files! [this] (go (let [remote-all-files-meta-c ( diff-remote-files) remote-txid-or-ex (local)]" (count sorted-diff-remote-files) "files need to sync") (let [filtered-files (filter-download-files-with-reserved-chars sorted-diff-remote-files)] (swap! *sync-state #(sync-state-reset-full-remote->local-files % sorted-diff-remote-files)) (local! this (map (juxt relative-path -checksum) filtered-files) latest-txid))))))))))) (defn- > (merge-with #(boolean (or (nil? %1) (= "fake-checksum" %1) (= %1 %2))) origin-checksum-map current-checksum-map) (filterv (comp true? second)) (mapv first) (select-keys origin-map) vals)))))) (def ^:private file-size-limit (* 100 1000 1024)) ;100MB (defn- filter-too-huge-files-aux [e] {:post [(boolean? %)]} (if (= "unlink" (.-type ^FileChangeEvent e)) true (boolean (when-some [size (:size (.-stat e))] (< size file-size-limit))))) (defn- filter-too-huge-files "filter out files > `file-size-limit`" [es] {:pre [(or (nil? es) (coll? es)) (every? #(instance? FileChangeEvent %) es)]} (filterv filter-too-huge-files-aux es)) (defn- filter-local-files-in-deletion-logs [local-all-files-meta deletion-logs remote-all-files-meta] (let [deletion-logs-map (into {} (mapcat (fn [log] (mapv (fn [path] [path (select-keys log [:epoch :TXId])]) (:paths log)))) deletion-logs) remote-all-files-meta-map (into {} (map (juxt :path identity)) remote-all-files-meta) *keep (transient #{}) *delete (transient #{}) filtered-deletion-logs-map (loop [[deletion-log & others] deletion-logs-map result {}] (if-not deletion-log result (let [[deletion-log-path deletion-log-meta] deletion-log meta (get remote-all-files-meta-map deletion-log-path) meta-txid (:txid meta) deletion-txid (:TXId deletion-log-meta)] (if (and meta-txid deletion-txid (> meta-txid deletion-txid)) (recur others result) (recur others (into result [[deletion-log-path deletion-log-meta]]))))))] (doseq [f local-all-files-meta] (let [epoch-long (some-> (get filtered-deletion-logs-map (:path f)) :epoch (* 1000))] (if (and epoch-long (> epoch-long (:last-modified f))) (conj! *delete f) (conj! *keep f)))) {:keep (persistent! *keep) :delete (persistent! *delete)})) (defn- RemoteSyncer [user-uuid graph-uuid base-path repo *sync-state remoteapi ^:mutable rate *txid *txid-for-get-deletion-log ^:mutable remote->local-syncer stop-chan *stopped *paused ;; control chans private-immediately-local->remote-chan private-recent-edited-chan] Object (filter-file-change-events-fn [_] (fn [^FileChangeEvent e] (go (and (instance? FileChangeEvent e) (if-let [mtime (:mtime (.-stat e))] ;; if mtime is not nil, it should be after (- now 1min) ;; ignore events too early (> (* 1000 mtime) (tc/to-long (t/minus (t/now) (t/minutes 1)))) true) (or (string/starts-with? (.-dir e) base-path) (string/starts-with? (str "file://" (.-dir e)) base-path)) ; valid path prefix (not (ignored? e)) ;not ignored ;; download files will also trigger file-change-events, ignore them (if (= "unlink" (:type e)) true (when-some [recent-remote->local-file-item (recent-remote->local-file-item graph-uuid e))] (not (contains? (:recent-remote->local-files @*sync-state) recent-remote->local-file-item)))))))) (set-remote->local-syncer! [_ s] (set! remote->local-syncer s)) ILocal->RemoteSync (setup-local->remote! [_] (async/tap immediately-local->remote-mult private-immediately-local->remote-chan) (async/tap recent-edited-mult private-recent-edited-chan)) (stop-local->remote! [_] (async/untap immediately-local->remote-mult private-immediately-local->remote-chan) (async/untap recent-edited-mult private-recent-edited-chan) (async/close! stop-chan) (vreset! *stopped true)) (remote-files e) (let [v (remote-files e)) v))))) :flush-fn #(swap! *sync-state sync-state-reset-queued-local->remote-files) :stop-ch stop-chan :distinct-coll? true :flush-now-ch private-immediately-local->remote-chan :refresh-timeout-ch private-recent-edited-chan))) (remote! [_ es] (if (empty? es) (go {:succ true}) (let [type (.-type ^FileChangeEvent (first es)) es->paths-xf (comp (map #(relative-path %)) (remove ignored?))] (go (let [es* ( (sequence es->paths-xf es**) (not= type "unlink") filter-upload-files-with-reserved-chars) _ (println :sync-local->remote type paths) r (if (empty? paths) (go @*txid) (case type ("add" "change") (remote-files paths) r* (remote-files paths succ?)] (cond (need-sync-remote? r*) (do (println :need-sync-remote r*) {:need-sync-remote true}) (need-reset-local-txid? r*) ;; TODO: this cond shouldn't be true, ;; but some potential bugs cause local-txid > remote-txid (let [remote-txid-or-ex (remote! update txid" r*) ;; persist txid (remote unknown:" r*) {:unknown r*}))))))) (remote-all-files! [this] (go (let [remote-all-files-meta-c (> (diff-file-metadata-sets local-all-files-meta remote-all-files-meta) ()) change-events (sequence (comp ;; convert to FileChangeEvent (map #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %) {:size (:size %)} (:etag %))) (remove ignored?)) diff-local-files) distinct-change-events (-> (distinct-file-change-events change-events) filter-upload-files-with-reserved-chars) _ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distinct-change-events)) change-events-partitions (sequence ;; partition FileChangeEvents (partition-file-change-events upload-batch-size) distinct-change-events)] (println "[full-sync(local->remote)]" (count (flatten change-events-partitions)) "files need to sync and" (count delete-local-files) "local files need to delete") (put-sync-event! {:event :start :data {:type :full-local->remote :graph-uuid graph-uuid :full-sync? true :epoch (tc/to-epoch (t/now))}}) ;; 1. delete local files (loop [[f & fs] delete-local-files] (when f (let [relative-p (relative-path f)] (when-not (local-file-item {:remote->local-type :delete :checksum nil :path relative-p}] (swap! *sync-state sync-state--add-recent-remote->local-files [fake-recent-remote->local-file-item]) (local-files [fake-recent-remote->local-file-item]))))) (recur fs))) ;; 2. upload local files (let [r (loop [es-partitions change-events-partitions] (if @*stopped {:stop true} (if (empty? es-partitions) {:succ true} (let [{:keys [succ need-sync-remote graph-has-been-deleted unknown stop] :as r} (remote! this (first es-partitions)))] (s/assert ::sync-local->remote!-result r) (cond succ (recur (next es-partitions)) (or need-sync-remote graph-has-been-deleted unknown stop) r)))))] ;; update *txid-for-get-deletion-log (reset! *txid-for-get-deletion-log @*txid) r ))))))) ;;; ### put all stuff together (defrecord ^:large-vars/cleanup-todo SyncManager [user-uuid graph-uuid base-path *sync-state ^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer remoteapi ^:mutable ratelimit-local-changes-chan *txid *txid-for-get-deletion-log ^:mutable state ^:mutable remote-change-chan ^:mutable *ws *stopped? *paused? ^:mutable ops-chan ^:mutable app-awake-from-sleep-chan ;; control chans private-full-sync-chan private-remote->local-sync-chan private-remote->local-full-sync-chan private-pause-resume-chan] Object (schedule [this next-state args reason] {:pre [(s/valid? ::state next-state)]} (println (str "[SyncManager " graph-uuid "]") (and state (name state)) "->" (and next-state (name next-state)) :reason reason :local-txid @*txid :now (tc/to-string (t/now))) (set! state next-state) (swap! *sync-state sync-state--update-state next-state) (go (case state ::need-password (remote (remote this args)) ::remote->local (local this nil args)) ::local->remote-full-sync (local-full-sync (local-full-sync this args)) ::pause (remote-syncer local-changes-revised-chan)) (setup-local->remote! local->remote-syncer) (async/tap full-sync-mult private-full-sync-chan) (async/tap remote->local-sync-mult private-remote->local-sync-chan) (async/tap remote->local-full-sync-mult private-remote->local-full-sync-chan) (async/tap pause-resume-mult private-pause-resume-chan) (async/tap pubsub/app-wake-up-from-sleep-mult app-awake-from-sleep-chan) (go-loop [] (let [{:keys [remote->local remote->local-full-sync local->remote-full-sync local->remote resume pause stop]} (async/alt! private-remote->local-full-sync-chan {:remote->local-full-sync true} private-remote->local-sync-chan {:remote->local true} private-full-sync-chan {:local->remote-full-sync true} private-pause-resume-chan ([v] (if v {:resume true} {:pause true})) remote-change-chan ([v] (println "remote change:" v) {:remote->local v}) ratelimit-local-changes-chan ([v] (if (nil? v) {:stop true} (let [rest-v (util/drain-chan ratelimit-local-changes-chan) vs (cons v rest-v)] (println "local changes:" vs) {:local->remote vs}))) app-awake-from-sleep-chan {:remote->local true} (timeout (* 20 60 1000)) {:local->remote-full-sync true} (timeout (* 10 60 1000)) {:remote->local true} :priority true)] (cond stop nil remote->local-full-sync (do (util/drain-chan ops-chan) (>! ops-chan {:remote->local-full-sync true}) (recur)) remote->local (let [txid (if (true? remote->local) {:txid (:TXId (local)] (when (some? txid) (>! ops-chan {:remote->local txid})) (recur)) local->remote (do (>! ops-chan {:local->remote local->remote}) (recur)) local->remote-full-sync (do (util/drain-chan ops-chan) (>! ops-chan {:local->remote-full-sync true}) (recur)) resume (do (>! ops-chan {:resume true}) (recur)) pause (do (vreset! *paused? true) (>! ops-chan {:pause true}) (recur))))) (.schedule this ::need-password nil nil)) (need-password [this] (go (let [next-state (local remote->local-full-sync local->remote local->remote-full-sync] :as resume-state} (get @*resume-state graph-uuid)] (resume-state--reset graph-uuid) (vreset! *paused? false) (cond remote->local (offer! private-remote->local-sync-chan true) remote->local-full-sync (offer! private-remote->local-full-sync-chan true) local->remote (>! ops-chan {:local->remote local->remote}) local->remote-full-sync (offer! private-full-sync-chan true) :else ;; if resume-state = nil, try a remote->local to sync recent diffs (offer! private-remote->local-sync-chan true)) (put-sync-event! {:event :resume :data {:graph-uuid graph-uuid :resume-state resume-state :epoch (tc/to-epoch (t/now))}}) (local local->remote local->remote-full-sync remote->local-full-sync pause resume] :as result} (local (local {:remote remote->local} {:remote-changed remote->local})) local->remote (remote {:local local->remote} {:local-changed local->remote})) local->remote-full-sync (remote-full-sync nil nil)) remote->local-full-sync (local-full-sync nil nil)) pause (remote-all-files! local->remote-syncer))] (s/assert ::sync-local->remote-all-files!-result r) (cond succ (do (swap! *sync-state #(sync-state-reset-full-local->remote-files % [])) (put-sync-event! {:event :finished-local->remote :data {:graph-uuid graph-uuid :full-sync? true :epoch (tc/to-epoch (t/now))}}) (.schedule this ::idle nil nil)) need-sync-remote (do (util/drain-chan ops-chan) (>! ops-chan {:remote->local true}) (>! ops-chan {:local->remote-full-sync true}) (.schedule this ::idle nil nil)) graph-has-been-deleted (.schedule this ::stop nil :graph-has-been-deleted) stop (.schedule this ::stop nil nil) unknown (do (state/pub-event! [:capture-error {:error unknown :payload {:type :sync/unknown :event :local->remote-full-sync-failed :user-id user-uuid :graph-uuid graph-uuid}}]) (put-sync-event! {:event :local->remote-full-sync-failed :data {:graph-uuid graph-uuid :epoch (tc/to-epoch (t/now))}}) (.schedule this ::idle nil nil)))))) (remote->local-full-sync [this _] (go (let [{:keys [succ unknown stop pause]} (local-all-files! remote->local-syncer))] (cond succ (do (put-sync-event! {:event :finished-remote->local :data {:graph-uuid graph-uuid :full-sync? true :epoch (tc/to-epoch (t/now))}}) (.schedule this ::idle nil nil)) stop (.schedule this ::stop nil nil) pause (do (resume-state--add-remote->local-full-sync-state graph-uuid) (.schedule this ::pause nil nil)) unknown (do (state/pub-event! [:capture-error {:error unknown :payload {:event :remote->local-full-sync-failed :type :sync/unknown :graph-uuid graph-uuid :user-id user-uuid}}]) (put-sync-event! {:event :remote->local-full-sync-failed :data {:graph-uuid graph-uuid :exp unknown :epoch (tc/to-epoch (t/now))}}) (let [next-state (if (string/includes? (str (ex-cause unknown)) "404 Not Found") ;; TODO: this should never happen ::stop ;; if any other exception occurred, re-exec remote->local-full-sync ::remote->local-full-sync)] (.schedule this next-state nil nil))))))) (remote->local [this _next-state {remote-val :remote}] (go (if (some-> remote-val :txid (<= @*txid)) (.schedule this ::idle nil nil) (let [origin-txid @*txid {:keys [succ unknown stop pause need-remote->local-full-sync] :as r} (local! remote->local-syncer))] (s/assert ::sync-remote->local!-result r) (cond need-remote->local-full-sync (do (util/drain-chan ops-chan) (>! ops-chan {:remote->local-full-sync true}) (>! ops-chan {:local->remote-full-sync true}) (.schedule this ::idle nil nil)) succ (do (put-sync-event! {:event :finished-remote->local :data {:graph-uuid graph-uuid :full-sync? false :from-txid origin-txid :to-txid @*txid :epoch (tc/to-epoch (t/now))}}) (.schedule this ::idle nil nil)) stop (.schedule this ::stop nil nil) pause (do (resume-state--add-remote->local-state graph-uuid) (.schedule this ::pause nil nil)) unknown (do (prn "remote->local err" unknown) (state/pub-event! [:capture-error {:error unknown :payload {:type :sync/unknown :event :remote->local :user-id user-uuid :graph-uuid graph-uuid}}]) (.schedule this ::idle nil nil))))))) (local->remote [this {local-changes :local}] ;; local-changes:: list of FileChangeEvent (assert (some? local-changes) local-changes) (go (let [distincted-local-changes (distinct-file-change-events local-changes) _ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distincted-local-changes)) change-events-partitions (sequence (partition-file-change-events upload-batch-size) distincted-local-changes) _ (put-sync-event! {:event :start :data {:type :local->remote :graph-uuid graph-uuid :full-sync? false :epoch (tc/to-epoch (t/now))}}) {:keys [succ need-sync-remote graph-has-been-deleted unknown stop pause]} (loop [es-partitions change-events-partitions] (cond @*stopped? {:stop true} @*paused? {:pause true} (empty? es-partitions) {:succ true} :else (let [{:keys [succ need-sync-remote graph-has-been-deleted pause unknown stop] :as r} (remote! local->remote-syncer (first es-partitions)))] (s/assert ::sync-local->remote!-result r) (cond succ (recur (next es-partitions)) (or need-sync-remote graph-has-been-deleted unknown pause stop) r))))] (cond succ (do (swap! *sync-state #(sync-state-reset-full-local->remote-files % [])) (put-sync-event! {:event :finished-local->remote :data {:graph-uuid graph-uuid :full-sync? false :file-change-events distincted-local-changes :epoch (tc/to-epoch (t/now))}}) (.schedule this ::idle nil nil)) need-sync-remote (do (util/drain-chan ops-chan) (>! ops-chan {:remote->local true}) (>! ops-chan {:local->remote local-changes}) (.schedule this ::idle nil nil)) graph-has-been-deleted (.schedule this ::stop nil :graph-has-been-deleted) stop (.schedule this ::stop nil nil) pause (do (resume-state--add-local->remote-state graph-uuid local-changes) (.schedule this ::pause nil nil)) unknown (do (debug/pprint "local->remote" unknown) (state/pub-event! [:capture-error {:error unknown :payload {:event :local->remote :type :sync/unknown :user-id user-uuid :graph-uuid graph-uuid}}]) (.schedule this ::idle nil nil)))))) IStoppable (-stop! [_] (go (when-not @*stopped? (vreset! *stopped? true) (ws-stop! *ws) (async/untap full-sync-mult private-full-sync-chan) (async/untap remote->local-sync-mult private-remote->local-sync-chan) (async/untap remote->local-full-sync-mult private-remote->local-full-sync-chan) (async/untap pause-resume-mult private-pause-resume-chan) (async/untap pubsub/app-wake-up-from-sleep-mult app-awake-from-sleep-chan) (when ops-chan (async/close! ops-chan)) (stop-local->remote! local->remote-syncer) (stop-remote->local! remote->local-syncer) (RemoteAPI *stopped?) local->remote-syncer (->Local->RemoteSyncer user-uuid graph-uuid base-path repo *sync-state remoteapi-with-stop (if (mobile-util/native-platform?) 2000 10000) *txid *txid-for-get-deletion-log nil (chan) *stopped? *paused? (chan 1) (chan 1)) remote->local-syncer (->Remote->LocalSyncer user-uuid graph-uuid base-path repo *txid *txid-for-get-deletion-log *sync-state remoteapi-with-stop nil *stopped? *paused?)] (.set-remote->local-syncer! local->remote-syncer remote->local-syncer) (.set-local->remote-syncer! remote->local-syncer local->remote-syncer) (swap! *sync-state sync-state--update-current-syncing-graph-uuid graph-uuid) (->SyncManager user-uuid graph-uuid base-path *sync-state local->remote-syncer remote->local-syncer remoteapi-with-stop nil *txid *txid-for-get-deletion-log nil nil nil *stopped? *paused? nil nil (chan 1) (chan 1) (chan 1) (chan 1)))) (defn sync-manager-singleton [user-uuid graph-uuid base-path repo txid *sync-state] (when-not @current-sm-graph-uuid (reset! current-sm-graph-uuid graph-uuid) (sync-manager user-uuid graph-uuid base-path repo txid *sync-state))) ;; Avoid sync reentrancy (defonce *sync-entered? (atom false)) (defn remote-now [] (go (when-let [_sm ^SyncManager (state/get-file-sync-manager (state/get-current-file-sync-graph-uuid))] (offer! immediately-local->remote-chan true)))) (defn sync-need-password! [] (when-let [sm ^SyncManager (state/get-file-sync-manager (state/get-current-file-sync-graph-uuid))] (.need-password sm))) (defn check-graph-belong-to-current-user [current-user-uuid graph-user-uuid] (cond (nil? current-user-uuid) false (= current-user-uuid graph-user-uuid) true :else (do (notification/show! (t :file-sync/other-user-graph) :warning false) false))) (defn > (:Graphs r) (mapv :GraphUUID) set (#(contains? % local-graph-uuid)))))] (when-not result (notification/show! (t :file-sync/graph-deleted) :warning false)) result))) (defn sync-off? [sync-state] (or (nil? sync-state) (sync-state--stopped? sync-state))) (defn graph-sync-off? "Is sync not running for this `graph-uuid`?" [graph-uuid] (sync-off? (state/get-file-sync-state graph-uuid))) (defn graph-encrypted? [] (when-let [graph-uuid (second @graphs-txid)] (get-pwd graph-uuid))) (declare network-online-cursor) (defn c (persist-var/-load graphs-txid))) [user-uuid graph-uuid txid] @graphs-txid txid (or txid 0) repo (state/get-current-repo)] (when-not (instance? ExceptionInfo current-user-uuid) (when (and repo @network-online-cursor user-uuid graph-uuid txid (graph-sync-off? graph-uuid) (user/logged-in?) (not (config/demo-graph? repo))) (try (when-let [sm (sync-manager-singleton current-user-uuid graph-uuid (config/get-repo-dir repo) repo txid *sync-state)] (when (check-graph-belong-to-current-user current-user-uuid user-uuid) (if-not (local-full-sync-chan true) (offer! full-sync-chan true))))) (catch :default e (prn "Sync start error: ") (log/error :exception e))))) (reset! *sync-entered? false)))))) (defn- restart-if-stopped! [is-active?] (cond (and is-active? (graph-sync-off? (second @graphs-txid))) (remote-chan (chan 1)) (add-watch app-state-changed-cursor "sync" (fn [_ _ _ {:keys [is-active?]}] (cond (mobile-util/native-android?) (when-not is-active? (remote-now)) (mobile-util/native-ios?) (let [*task-id (atom nil)] (if is-active? (restart-if-stopped! is-active?) (when (state/get-current-file-sync-graph-uuid) (p/let [task-id (.beforeExit ^js BackgroundTask (fn [] (go ;; Wait for file watcher events (remote-chan) (remote-now)) ;; wait at most 20s (async/alts! [finished-local->remote-chan (timeout 20000)]) (p/let [active? (mobile-util/app-active?)] (when-not active? (offer! pause-resume-chan is-active?))) (local-full-sync when it failed before (def re-remote->local-full-sync-chan (chan 1)) (async/sub pubsub/sync-events-pub :remote->local-full-sync-failed re-remote->local-full-sync-chan) (go-loop [] (let [{{graph-uuid :graph-uuid} :data} (local-full-sync-chan) {:keys [current-syncing-graph-uuid]} (state/get-file-sync-state graph-uuid)] (when (= graph-uuid current-syncing-graph-uuid) (offer! remote->local-full-sync-chan true)) (recur))) ;; re-exec local->remote-full-sync when it failed (def re-local->remote-full-sync-chan (chan 1)) (async/sub pubsub/sync-events-pub :local->remote-full-sync-failed re-local->remote-full-sync-chan) (go-loop [] (let [{{graph-uuid :graph-uuid} :data} (remote-full-sync-chan) {:keys [current-syncing-graph-uuid]} (state/get-file-sync-state graph-uuid)] (when (= graph-uuid current-syncing-graph-uuid) (offer! full-sync-chan true)) (recur))) ;;; add-tap (comment (def *x (atom nil)) (add-tap (fn [v] (reset! *x v))) )