sync.cljs 138 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309
  1. (ns frontend.fs.sync
  2. "Main ns for providing file sync functionality"
  3. (:require [cljs-http.client :as http]
  4. [cljs-time.core :as t]
  5. [cljs-time.format :as tf]
  6. [cljs-time.coerce :as tc]
  7. [cljs.core.async :as async :refer [go timeout go-loop offer! poll! chan <! >!]]
  8. [cljs.core.async.impl.channels]
  9. [cljs.core.async.interop :refer [p->c]]
  10. [cljs.spec.alpha :as s]
  11. [clojure.set :as set]
  12. [clojure.string :as string]
  13. [clojure.pprint :as pp]
  14. [electron.ipc :as ipc]
  15. [goog.string :as gstring]
  16. [frontend.config :as config]
  17. [frontend.debug :as debug]
  18. [frontend.handler.user :as user]
  19. [frontend.state :as state]
  20. [frontend.mobile.util :as mobile-util]
  21. [frontend.util :as util]
  22. [frontend.util.persist-var :as persist-var]
  23. [frontend.util.fs :as fs-util]
  24. [frontend.handler.notification :as notification]
  25. [frontend.context.i18n :refer [t]]
  26. [frontend.diff :as diff]
  27. [frontend.db :as db]
  28. [frontend.fs :as fs]
  29. [frontend.encrypt :as encrypt]
  30. [frontend.pubsub :as pubsub]
  31. [logseq.graph-parser.util :as gp-util]
  32. [medley.core :refer [dedupe-by]]
  33. [rum.core :as rum]
  34. [promesa.core :as p]
  35. [lambdaisland.glogi :as log]
  36. [frontend.fs.capacitor-fs :as capacitor-fs]
  37. ["@capawesome/capacitor-background-task" :refer [BackgroundTask]]
  38. ["path" :as path]))
  39. ;;; ### Commentary
  40. ;; file-sync related local files/dirs:
  41. ;; - logseq/graphs-txid.edn
  42. ;; this file contains [user-uuid graph-uuid transaction-id]
  43. ;; graph-uuid: the unique identifier of the graph on the server
  44. ;; transaction-id: sync progress of local files
  45. ;; - logseq/version-files
  46. ;; downloaded version-files
  47. ;; files included by `get-ignored-files` will not be synchronized.
  48. ;;
  49. ;; sync strategy:
  50. ;; - when toggle file-sync on,
  51. ;; trigger remote->local-full-sync first, then local->remote-full-sync
  52. ;; local->remote-full-sync will compare local-files with remote-files (by md5),
  53. ;; and upload new-added-files to remote server.
  54. ;; - if local->remote sync(normal-sync or full-sync) return :need-sync-remote,
  55. ;; then trigger a remote->local sync
  56. ;; - if remote->local sync return :need-remote->local-full-sync,
  57. ;; then we need a remote->local-full-sync,
  58. ;; which compare local-files with remote-files, sync diff-remote-files to local
  59. ;; - local->remote-full-sync will be triggered after 20mins of idle
  60. ;; - every 10s, flush local changes, and sync to remote
  61. ;; TODO: use access-token instead of id-token
  62. ;; TODO: a remote delete-diff cause local related-file deleted, then trigger a `FileChangeEvent`,
  63. ;; and re-produce a new same-file-delete diff.
  64. ;;; ### specs
  65. (s/def ::state #{;; do following jobs when ::starting:
  66. ;; - wait seconds for file-change-events from file-watcher
  67. ;; - drop redundant file-change-events
  68. ;; - setup states in `frontend.state`
  69. ::starting
  70. ::need-password
  71. ::idle
  72. ;; sync local-changed files
  73. ::local->remote
  74. ;; sync remote latest-transactions
  75. ::remote->local
  76. ;; local->remote full sync
  77. ::local->remote-full-sync
  78. ;; remote->local full sync
  79. ::remote->local-full-sync
  80. ;; snapshot state when switching between apps on iOS
  81. ::pause
  82. ::stop})
  83. (s/def ::path string?)
  84. (s/def ::time t/date?)
  85. (s/def ::remote->local-type #{:delete :update
  86. ;; :rename=:delete+:update
  87. })
  88. (s/def ::current-syncing-graph-uuid (s/or :nil nil? :graph-uuid string?))
  89. (s/def ::recent-remote->local-file-item (s/keys :req-un [::remote->local-type ::checksum ::path]))
  90. (s/def ::current-local->remote-files (s/coll-of ::path :kind set?))
  91. (s/def ::current-remote->local-files (s/coll-of ::path :kind set?))
  92. (s/def ::recent-remote->local-files (s/coll-of ::recent-remote->local-file-item :kind set?))
  93. (s/def ::history-item (s/keys :req-un [::path ::time]))
  94. (s/def ::history (s/coll-of ::history-item :kind seq?))
  95. (s/def ::sync-state (s/keys :req-un [::current-syncing-graph-uuid
  96. ::state
  97. ::current-local->remote-files
  98. ::current-remote->local-files
  99. ::queued-local->remote-files
  100. ;; Downloading files from remote will trigger filewatcher events,
  101. ;; causes unreasonable information in the content of ::queued-local->remote-files,
  102. ;; use ::recent-remote->local-files to filter such events
  103. ::recent-remote->local-files
  104. ::history]))
  105. ;; diff
  106. (s/def ::TXId pos-int?)
  107. (s/def ::TXType #{"update_files" "delete_files" "rename_file"})
  108. (s/def ::TXContent-to-path string?)
  109. (s/def ::TXContent-from-path (s/or :some string? :none nil?))
  110. (s/def ::TXContent-checksum (s/or :some string? :none nil?))
  111. (s/def ::TXContent-item (s/tuple ::TXContent-to-path
  112. ::TXContent-from-path
  113. ::TXContent-checksum))
  114. (s/def ::TXContent (s/coll-of ::TXContent-item))
  115. (s/def ::diff (s/keys :req-un [::TXId ::TXType ::TXContent]))
  116. (s/def ::succ-map #(= {:succ true} %))
  117. (s/def ::unknown-map (comp some? :unknown))
  118. (s/def ::stop-map #(= {:stop true} %))
  119. (s/def ::pause-map #(= {:pause true} %))
  120. (s/def ::need-sync-remote #(= {:need-sync-remote true} %))
  121. (s/def ::graph-has-been-deleted #(= {:graph-has-been-deleted true} %))
  122. (s/def ::sync-local->remote!-result
  123. (s/or :stop ::stop-map
  124. :succ ::succ-map
  125. :pause ::pause-map
  126. :need-sync-remote ::need-sync-remote
  127. :graph-has-been-deleted ::graph-has-been-deleted
  128. :unknown ::unknown-map))
  129. (s/def ::sync-remote->local!-result
  130. (s/or :succ ::succ-map
  131. :need-remote->local-full-sync
  132. #(= {:need-remote->local-full-sync true} %)
  133. :stop ::stop-map
  134. :pause ::pause-map
  135. :unknown ::unknown-map))
  136. (s/def ::sync-local->remote-all-files!-result
  137. (s/or :succ ::succ-map
  138. :stop ::stop-map
  139. :need-sync-remote ::need-sync-remote
  140. :graph-has-been-deleted ::graph-has-been-deleted
  141. :unknown ::unknown-map))
  142. ;; sync-event type
  143. (s/def ::event #{:created-local-version-file
  144. :finished-local->remote
  145. :finished-remote->local
  146. :start
  147. :pause
  148. :resume
  149. :exception-decrypt-failed
  150. :remote->local-full-sync-failed
  151. :local->remote-full-sync-failed
  152. :get-remote-graph-failed
  153. :get-deletion-logs-failed
  154. })
  155. (s/def ::sync-event (s/keys :req-un [::event ::data]))
  156. (defonce download-batch-size 100)
  157. (defonce upload-batch-size 20)
  158. (def ^:private current-sm-graph-uuid (atom nil))
  159. ;;; ### configs in config.edn
  160. ;; - :file-sync/ignore-files
  161. (defn- get-ignored-files
  162. []
  163. (into #{#"logseq/graphs-txid.edn$"
  164. #"logseq/pages-metadata.edn$"
  165. #"logseq/version-files/"
  166. #"logseq/bak/"
  167. #"node_modules/"
  168. ;; path starts with `.` in the root directory, e.g. .gitignore
  169. #"^\.[^.]+"
  170. ;; path includes `/.`, e.g. .git, .DS_store
  171. #"/\."
  172. ;; Emacs/Vim backup files end with `~` by default
  173. #"~$"}
  174. (map re-pattern)
  175. (:file-sync/ignore-files (state/get-config))))
  176. ;;; ### configs ends
  177. (def ws-addr config/WS-URL)
  178. ;; Warning: make sure to `persist-var/-load` graphs-txid before using it.
  179. (defonce graphs-txid (persist-var/persist-var nil "graphs-txid"))
  180. (declare assert-local-txid<=remote-txid)
  181. (defn <update-graphs-txid!
  182. [latest-txid graph-uuid user-uuid repo]
  183. {:pre [(int? latest-txid) (>= latest-txid 0)]}
  184. (-> (p/let [_ (persist-var/-reset-value! graphs-txid [user-uuid graph-uuid latest-txid] repo)
  185. _ (persist-var/persist-save graphs-txid)]
  186. (when (state/developer-mode?) (assert-local-txid<=remote-txid)))
  187. p->c))
  188. (defn clear-graphs-txid! [repo]
  189. (persist-var/-reset-value! graphs-txid nil repo)
  190. (persist-var/persist-save graphs-txid))
  191. (defn- ws-ping-loop [ws]
  192. (go-loop []
  193. (let [state (.-readyState ws)]
  194. ;; not closing or closed state
  195. (when (not (contains? #{2 3} state))
  196. (if (not= 1 state)
  197. ;; when connecting, wait 1s
  198. (do (<! (timeout 1000))
  199. (recur))
  200. (do (.send ws "PING")
  201. ;; aws apigateway websocket
  202. ;; Idle Connection Timeout: 10min
  203. (<! (timeout (* 5 60 1000)))
  204. (recur)))))))
  205. (defn- ws-stop! [*ws]
  206. (when *ws
  207. (swap! *ws (fn [o] (assoc o :stop true)))
  208. (when-let [ws (:ws @*ws)]
  209. (.close ws))))
  210. (defn- ws-listen!*
  211. [graph-uuid *ws remote-changes-chan]
  212. (reset! *ws {:ws (js/WebSocket. (util/format ws-addr graph-uuid)) :stop false})
  213. (ws-ping-loop (:ws @*ws))
  214. ;; (set! (.-onopen (:ws @*ws)) #(println (util/format "ws opened: graph '%s'" graph-uuid %)))
  215. (set! (.-onclose (:ws @*ws)) (fn [_e]
  216. (when-not (true? (:stop @*ws))
  217. (go
  218. (timeout 1000)
  219. (println "re-connecting graph" graph-uuid)
  220. (ws-listen!* graph-uuid *ws remote-changes-chan)))))
  221. (set! (.-onmessage (:ws @*ws)) (fn [e]
  222. (let [data (js->clj (js/JSON.parse (.-data e)) :keywordize-keys true)]
  223. (when (some? (:txid data))
  224. (if-let [v (poll! remote-changes-chan)]
  225. (let [last-txid (:txid v)
  226. current-txid (:txid data)]
  227. (if (> last-txid current-txid)
  228. (offer! remote-changes-chan v)
  229. (offer! remote-changes-chan data)))
  230. (offer! remote-changes-chan data)))))))
  231. (defn ws-listen!
  232. "return channel which output messages from server"
  233. [graph-uuid *ws]
  234. (let [remote-changes-chan (chan (async/sliding-buffer 1))]
  235. (ws-listen!* graph-uuid *ws remote-changes-chan)
  236. remote-changes-chan))
  237. (defn- get-json-body [body]
  238. (or (and (not (string? body)) body)
  239. (or (string/blank? body) nil)
  240. (js->clj (js/JSON.parse body) :keywordize-keys true)))
  241. (defn- get-resp-json-body [resp]
  242. (-> resp (:body) (get-json-body)))
  243. (defn- <request-once [api-name body token]
  244. (go
  245. (let [resp (http/post (str "https://" config/API-DOMAIN "/file-sync/" api-name)
  246. {:oauth-token token
  247. :body (js/JSON.stringify (clj->js body))
  248. :with-credentials? false})]
  249. {:resp (<! resp)
  250. :api-name api-name
  251. :body body})))
  252. ;; For debug
  253. (def *on-flying-request
  254. "requests not finished"
  255. (atom #{}))
  256. (def stoppable-apis #{"get_all_files"})
  257. (defn- <request*
  258. "max retry count is 5.
  259. *stop: volatile var, stop retry-request when it's true,
  260. and return :stop"
  261. ([api-name body token *stop] (<request* api-name body token 0 *stop))
  262. ([api-name body token retry-count *stop]
  263. (go
  264. (if (and *stop @*stop (contains? stoppable-apis api-name))
  265. :stop
  266. (let [resp (<! (<request-once api-name body token))]
  267. (if (and
  268. (= 401 (get-in resp [:resp :status]))
  269. (= "Unauthorized" (:message (get-json-body (get-in resp [:resp :body])))))
  270. (if (> retry-count 5)
  271. (throw (js/Error. :file-sync-request))
  272. (do (println "will retry after" (min 60000 (* 1000 retry-count)) "ms")
  273. (<! (timeout (min 60000 (* 1000 retry-count))))
  274. (<! (<request* api-name body token (inc retry-count) *stop))))
  275. (:resp resp)))))))
  276. (defn <request [api-name & args]
  277. (let [name (str api-name (.now js/Date))]
  278. (go (swap! *on-flying-request conj name)
  279. (let [r (<! (apply <request* api-name args))]
  280. (swap! *on-flying-request disj name)
  281. r))))
  282. (defn- remove-dir-prefix [dir path]
  283. (let [r (string/replace path (js/RegExp. (str "^" (gstring/regExpEscape dir))) "")]
  284. (if (string/starts-with? r "/")
  285. (string/replace-first r "/" "")
  286. r)))
  287. (defn- remove-user-graph-uuid-prefix
  288. "<user-uuid>/<graph-uuid>/path -> path"
  289. [path]
  290. (let [parts (string/split path "/")]
  291. (if (and (< 2 (count parts))
  292. (= 36 (count (parts 0)))
  293. (= 36 (count (parts 1))))
  294. (string/join "/" (drop 2 parts))
  295. path)))
  296. (defprotocol IRelativePath
  297. (-relative-path [this]))
  298. (defn relative-path [o]
  299. (let [repo-dir (config/get-repo-dir (state/get-current-repo))]
  300. (cond
  301. (implements? IRelativePath o)
  302. (-relative-path o)
  303. ;; full path
  304. (and (string? o) (string/starts-with? o repo-dir))
  305. (string/replace o (str repo-dir "/") "")
  306. (string? o)
  307. (remove-user-graph-uuid-prefix o)
  308. :else
  309. (throw (js/Error. (str "unsupport type " (str o)))))))
  310. (defprotocol IChecksum
  311. (-checksum [this]))
  312. (defprotocol IStoppable
  313. (-stop! [this]))
  314. (defprotocol IStopped?
  315. (-stopped? [this]))
  316. ;from-path, to-path is relative path
  317. (deftype FileTxn [from-path to-path updated? deleted? txid checksum]
  318. Object
  319. (renamed? [_]
  320. (not= from-path to-path))
  321. IRelativePath
  322. (-relative-path [_] (remove-user-graph-uuid-prefix to-path))
  323. IEquiv
  324. (-equiv [_ ^FileTxn other]
  325. (and (= from-path (.-from-path other))
  326. (= to-path (.-to-path other))
  327. (= updated? (.-updated? other))
  328. (= deleted? (.-deleted? other))))
  329. IHash
  330. (-hash [_] (hash [from-path to-path updated? deleted?]))
  331. IComparable
  332. (-compare [_ ^FileTxn other]
  333. (compare txid (.-txid other)))
  334. IPrintWithWriter
  335. (-pr-writer [coll w _opts]
  336. (write-all w "#FileTxn[\"" from-path "\" -> \"" to-path
  337. "\" (updated? " updated? ", renamed? " (.renamed? coll) ", deleted? " deleted?
  338. ", txid " txid ", checksum " checksum ")]")))
  339. (defn- assert-filetxns
  340. [filetxns]
  341. (every? true?
  342. (mapv
  343. (fn [^FileTxn filetxn]
  344. (if (.-updated? filetxn)
  345. (some? (-checksum filetxn))
  346. true))
  347. filetxns)))
  348. (defn- diff->filetxns
  349. "convert diff(`<get-diff`) to `FileTxn`"
  350. [{:keys [TXId TXType TXContent]}]
  351. {:post [(assert-filetxns %)]}
  352. (let [update? (= "update_files" TXType)
  353. delete? (= "delete_files" TXType)
  354. update-xf
  355. (comp
  356. (remove #(or (empty? (first %))
  357. (empty? (last %))))
  358. (map #(->FileTxn (first %) (first %) update? delete? TXId (last %))))
  359. delete-xf
  360. (comp
  361. (remove #(empty? (first %)))
  362. (map #(->FileTxn (first %) (first %) update? delete? TXId nil)))
  363. rename-xf
  364. (comp
  365. (remove #(or (empty? (first %))
  366. (empty? (second %))))
  367. (map #(->FileTxn (second %) (first %) false false TXId nil)))
  368. xf (case TXType
  369. "delete_files" delete-xf
  370. "update_files" update-xf
  371. "rename_file" rename-xf)]
  372. (sequence xf TXContent)))
  373. (defn- distinct-update-filetxns-xf
  374. "transducer.
  375. remove duplicate update&delete `FileTxn`s."
  376. [rf]
  377. (let [seen-update&delete-filetxns (volatile! #{})]
  378. (fn
  379. ([] (rf))
  380. ([result] (rf result))
  381. ([result ^FileTxn filetxn]
  382. (if (and
  383. (or (.-updated? filetxn) (.-deleted? filetxn))
  384. (contains? @seen-update&delete-filetxns filetxn))
  385. result
  386. (do (vswap! seen-update&delete-filetxns conj filetxn)
  387. (rf result filetxn)))))))
  388. (defn- remove-deleted-filetxns-xf
  389. "transducer.
  390. remove update&rename filetxns if they are deleted later(in greater txid filetxn)."
  391. [rf]
  392. (let [seen-deleted-paths (volatile! #{})]
  393. (fn
  394. ([] (rf))
  395. ([result] (rf result))
  396. ([result ^FileTxn filetxn]
  397. (let [to-path (.-to-path filetxn)
  398. from-path (.-from-path filetxn)]
  399. (if (contains? @seen-deleted-paths to-path)
  400. (do (when (not= to-path from-path)
  401. (vswap! seen-deleted-paths disj to-path)
  402. (vswap! seen-deleted-paths conj from-path))
  403. result)
  404. (do (vswap! seen-deleted-paths conj to-path)
  405. (rf result filetxn))))))))
  406. (defn- partition-filetxns
  407. "return transducer.
  408. partition filetxns, at most N update-filetxns in each partition,
  409. for delete and rename type, only one filetxn in each partition."
  410. [n]
  411. (comp
  412. (partition-by #(.-updated? ^FileTxn %))
  413. (map (fn [ts]
  414. (if (some-> (first ts) (.-updated?))
  415. (partition-all n ts)
  416. (map list ts))))
  417. cat))
  418. (defn- contains-path? [regexps path]
  419. (reduce #(when (re-find %2 path) (reduced true)) false regexps))
  420. (defn ignored?
  421. "Whether file is ignored when syncing."
  422. [path]
  423. (->
  424. (get-ignored-files)
  425. (contains-path? (relative-path path))
  426. (boolean)))
  427. (defn- filter-download-files-with-reserved-chars
  428. "Skip downloading file paths with reserved chars."
  429. [files]
  430. (let [f #(and
  431. (not (.-deleted? ^js %))
  432. (fs-util/include-reserved-chars? (-relative-path %)))
  433. reserved-files (filter f files)]
  434. (when (seq reserved-files)
  435. (state/pub-event! [:ui/notify-skipped-downloading-files
  436. (map -relative-path reserved-files)])
  437. (prn "Skipped downloading those file paths with reserved chars: "
  438. (map -relative-path reserved-files)))
  439. (remove f files)))
  440. (defn- filter-upload-files-with-reserved-chars
  441. "Remove upoading file paths with reserved chars."
  442. [paths]
  443. (let [path-string? (string? (first paths))
  444. f (if path-string?
  445. fs-util/include-reserved-chars?
  446. #(fs-util/include-reserved-chars? (-relative-path %)))
  447. reserved-paths (filter f paths)]
  448. (when (seq reserved-paths)
  449. (let [paths (if path-string? reserved-paths (map -relative-path reserved-paths))]
  450. (when (seq paths)
  451. (state/pub-event! [:ui/notify-outdated-filename-format paths]))
  452. (prn "Skipped uploading those file paths with reserved chars: " paths)))
  453. (vec (remove f paths))))
  454. (defn- diffs->filetxns
  455. "transducer.
  456. 1. diff -> `FileTxn` , see also `<get-diff`
  457. 2. distinct redundant update type filetxns
  458. 3. remove update or rename filetxns if they are deleted in later filetxns.
  459. NOTE: this xf should apply on reversed diffs sequence (sort by txid)"
  460. []
  461. (comp
  462. (map diff->filetxns)
  463. cat
  464. (remove ignored?)
  465. distinct-update-filetxns-xf
  466. remove-deleted-filetxns-xf))
  467. (defn- diffs->partitioned-filetxns
  468. "partition filetxns, each partition contains same type filetxns,
  469. for update type, at most N items in each partition
  470. for delete & rename type, only 1 item in each partition."
  471. [n]
  472. (comp
  473. (diffs->filetxns)
  474. (partition-filetxns n)))
  475. (defn- filepath+checksum->diff
  476. [index {:keys [relative-path checksum user-uuid graph-uuid]}]
  477. {:post [(s/valid? ::diff %)]}
  478. {:TXId (inc index)
  479. :TXType "update_files"
  480. :TXContent [[(string/join "/" [user-uuid graph-uuid relative-path]) nil checksum]]})
  481. (defn filepath+checksum-coll->partitioned-filetxns
  482. "transducer.
  483. 1. filepath+checksum-coll -> diff
  484. 2. diffs->partitioned-filetxns
  485. 3. filter by config"
  486. [n graph-uuid user-uuid]
  487. (comp
  488. (map (fn [p]
  489. {:relative-path (first p) :user-uuid user-uuid :graph-uuid graph-uuid :checksum (second p)}))
  490. (map-indexed filepath+checksum->diff)
  491. (diffs->partitioned-filetxns n)))
  492. (deftype FileMetadata [size etag path encrypted-path last-modified remote? txid ^:mutable normalized-path]
  493. Object
  494. (get-normalized-path [_]
  495. (assert (string? path) path)
  496. (when-not normalized-path
  497. (set! normalized-path
  498. (cond-> path
  499. (string/starts-with? path "/") (string/replace-first "/" "")
  500. remote? (remove-user-graph-uuid-prefix))))
  501. normalized-path)
  502. IRelativePath
  503. (-relative-path [_] path)
  504. IEquiv
  505. (-equiv [o ^FileMetadata other]
  506. (and (= (.get-normalized-path o) (.get-normalized-path other))
  507. (= etag (.-etag other))))
  508. IHash
  509. (-hash [_] (hash {:etag etag :path path}))
  510. ILookup
  511. (-lookup [o k] (-lookup o k nil))
  512. (-lookup [_ k not-found]
  513. (case k
  514. :size size
  515. :etag etag
  516. :path path
  517. :encrypted-path encrypted-path
  518. :last-modified last-modified
  519. :remote? remote?
  520. :txid txid
  521. not-found))
  522. IPrintWithWriter
  523. (-pr-writer [_ w _opts]
  524. (write-all w (str {:size size :etag etag :path path :remote? remote? :txid txid :last-modified last-modified}))))
  525. (def ^:private higher-priority-remote-files
  526. "when diff all remote files and local files, following remote files always need to download(when checksum not matched),
  527. even local-file's last-modified > remote-file's last-modified.
  528. because these files will be auto created when the graph created, we dont want them to re-write related remote files."
  529. #{"logseq/config.edn" "logseq/custom.css"
  530. "pages/contents.md" "pages/contents.org"
  531. "logseq/metadata.edn"})
  532. ;; TODO: use fn some to filter FileMetadata here, it cause too much loop
  533. (defn diff-file-metadata-sets
  534. "Find the `FileMetadata`s that exists in s1 and does not exist in s2,
  535. compare by path+checksum+last-modified,
  536. if s1.path = s2.path & s1.checksum <> s2.checksum & s1.last-modified > s2.last-modified
  537. (except some default created files),
  538. keep this `FileMetadata` in result"
  539. [s1 s2]
  540. (reduce
  541. (fn [result item]
  542. (let [path (:path item)
  543. lower-case-path (some-> path string/lower-case)
  544. ;; encrypted-path (:encrypted-path item)
  545. checksum (:etag item)
  546. last-modified (:last-modified item)]
  547. (if (some
  548. #(cond
  549. (not= lower-case-path (some-> (:path %) string/lower-case))
  550. false
  551. (= checksum (:etag %))
  552. true
  553. (>= last-modified (:last-modified %))
  554. false
  555. ;; these special files have higher priority in s1
  556. (contains? higher-priority-remote-files path)
  557. false
  558. (< last-modified (:last-modified %))
  559. true)
  560. s2)
  561. result
  562. (conj result item))))
  563. #{} s1))
  564. (comment
  565. (defn map->FileMetadata [m]
  566. (apply ->FileMetadata ((juxt :size :etag :path :encrypted-path :last-modified :remote? (constantly nil)) m)))
  567. (assert
  568. (=
  569. #{(map->FileMetadata {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2})}
  570. (diff-file-metadata-sets
  571. (into #{}
  572. (map map->FileMetadata)
  573. [{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1}
  574. {:size 1 :etag 2 :path 2 :encrypted-path 2 :last-modified 2}])
  575. (into #{}
  576. (map map->FileMetadata)
  577. [{:size 1 :etag 1 :path 1 :encrypted-path 1 :last-modified 1}
  578. {:size 1 :etag 1 :path 2 :encrypted-path 2 :last-modified 1}])))))
  579. (extend-protocol IChecksum
  580. FileMetadata
  581. (-checksum [this] (.-etag this))
  582. FileTxn
  583. (-checksum [this] (.-checksum this)))
  584. (defn- sort-file-metadata-fn
  585. ":recent-days-range > :favorite-pages > small-size pages > ...
  586. :recent-days-range : [<min-inst-ms> <max-inst-ms>]
  587. "
  588. [& {:keys [recent-days-range favorite-pages]}]
  589. {:pre [(or (nil? recent-days-range)
  590. (every? number? recent-days-range))]}
  591. (let [favorite-pages* (set favorite-pages)]
  592. (fn [^FileMetadata item]
  593. (let [path (relative-path item)
  594. journal-dir (path/join (config/get-journals-directory) path/sep)
  595. journal? (string/starts-with? path journal-dir)
  596. journal-day
  597. (when journal?
  598. (try
  599. (tc/to-long
  600. (tf/parse (tf/formatter "yyyy_MM_dd")
  601. (-> path
  602. (string/replace-first journal-dir "")
  603. (string/replace-first ".md" ""))))
  604. (catch :default _)))]
  605. (cond
  606. (and recent-days-range
  607. journal-day
  608. (<= (first recent-days-range)
  609. ^number journal-day
  610. (second recent-days-range)))
  611. journal-day
  612. (string/includes? path "logseq/")
  613. 9999
  614. (string/includes? path "content.")
  615. 10000
  616. (contains? favorite-pages* path)
  617. (count path)
  618. :else
  619. (- (.-size item)))))))
  620. ;;; ### path-normalize
  621. (def path-normalize
  622. (if (util/electron?)
  623. gp-util/path-normalize
  624. (partial capacitor-fs/normalize-file-protocol-path nil)))
  625. ;;; ### APIs
  626. ;; `RSAPI` call apis through rsapi package, supports operations on files
  627. (defprotocol IRSAPI
  628. (rsapi-ready? [this graph-uuid] "return true when rsapi ready")
  629. (<key-gen [this] "generate public+private keys")
  630. (<set-env [this graph-uuid prod? private-key public-key] "set environment")
  631. (<get-local-files-meta [this graph-uuid base-path filepaths] "get local files' metadata")
  632. (<get-local-all-files-meta [this graph-uuid base-path] "get all local files' metadata")
  633. (<rename-local-file [this graph-uuid base-path from to])
  634. (<update-local-files [this graph-uuid base-path filepaths] "remote -> local")
  635. (<download-version-files [this graph-uuid base-path filepaths])
  636. (<delete-local-files [this graph-uuid base-path filepaths])
  637. (<update-remote-files [this graph-uuid base-path filepaths local-txid] "local -> remote, return err or txid")
  638. (<delete-remote-files [this graph-uuid base-path filepaths local-txid] "return err or txid")
  639. (<encrypt-fnames [this graph-uuid fnames])
  640. (<decrypt-fnames [this graph-uuid fnames])
  641. (<cancel-all-requests [this])
  642. (<add-new-version [this repo path content]))
  643. (defprotocol IRemoteAPI
  644. (<user-info [this] "user info")
  645. (<get-remote-all-files-meta [this graph-uuid] "get all remote files' metadata")
  646. (<get-remote-files-meta [this graph-uuid filepaths] "get remote files' metadata")
  647. (<get-remote-graph [this graph-name-opt graph-uuid-opt] "get graph info by GRAPH-NAME-OPT or GRAPH-UUID-OPT")
  648. (<get-remote-file-versions [this graph-uuid filepath] "get file's version list")
  649. (<list-remote-graphs [this] "list all remote graphs")
  650. (<get-deletion-logs [this graph-uuid from-txid] "get deletion logs from FROM-TXID")
  651. (<get-diff [this graph-uuid from-txid] "get diff from FROM-TXID, return [txns, latest-txid, min-txid]")
  652. (<create-graph [this graph-name] "create graph")
  653. (<delete-graph [this graph-uuid] "delete graph")
  654. (<get-graph-salt [this graph-uuid] "return httpcode 410 when salt expired")
  655. (<create-graph-salt [this graph-uuid] "return httpcode 409 when salt already exists and not expired yet")
  656. (<get-graph-encrypt-keys [this graph-uuid])
  657. (<upload-graph-encrypt-keys [this graph-uuid public-key encrypted-private-key]))
  658. (defprotocol IRemoteControlAPI
  659. "api functions provided for outside the sync process"
  660. (<delete-remote-files-control [this graph-uuid filepaths])
  661. )
  662. (defprotocol IToken
  663. (<get-token [this]))
  664. (defn <case-different-local-file-exist?
  665. "e.g. filepath=\"pages/Foo.md\"
  666. found-filepath=\"pages/foo.md\"
  667. it happens on macos (case-insensitive fs)
  668. return canonicalized filepath if exists"
  669. [graph-uuid irsapi base-path filepath]
  670. (go
  671. (let [r (<! (<get-local-files-meta irsapi graph-uuid base-path [filepath]))]
  672. (when (some-> r first :path (not= filepath))
  673. (-> r first :path)))))
  674. (defn <local-file-not-exist?
  675. [graph-uuid irsapi base-path filepath]
  676. (go
  677. (let [r (<! (<get-local-files-meta irsapi graph-uuid base-path [filepath]))]
  678. (or
  679. ;; not found at all
  680. (empty? r)
  681. ;; or,
  682. ;; e.g. filepath="pages/Foo.md"
  683. ;; found-filepath="pages/foo.md"
  684. ;; it happens on macos (case-insensitive fs)
  685. (not= filepath (:path (first r)))))))
  686. (defn- <retry-rsapi [f]
  687. (go-loop [n 3]
  688. (let [r (<! (f))]
  689. (if (and (instance? ExceptionInfo r)
  690. (string/index-of (str (ex-cause r)) "operation timed out")
  691. (> n 0))
  692. (do
  693. (print (str "retry(" n ") ..."))
  694. (recur (dec n)))
  695. r))))
  696. (declare <rsapi-cancel-all-requests)
  697. (defn- <build-local-file-metadatas
  698. [this graph-uuid r]
  699. (go-loop [[[path metadata] & others] (js->clj r)
  700. result #{}]
  701. (if-not (and path metadata)
  702. ;; finish
  703. result
  704. (let [normalized-path (path-normalize path)
  705. encryptedFname (if (not= path normalized-path)
  706. (first (<! (<encrypt-fnames this graph-uuid [normalized-path])))
  707. (get metadata "encryptedFname"))]
  708. (recur others
  709. (conj result
  710. (->FileMetadata (get metadata "size") (get metadata "md5") normalized-path
  711. encryptedFname (get metadata "mtime") false nil nil)))))))
  712. (deftype RSAPI [^:mutable graph-uuid' ^:mutable private-key' ^:mutable public-key']
  713. IToken
  714. (<get-token [_this]
  715. (user/<wrap-ensure-id&access-token
  716. (state/get-auth-id-token)))
  717. IRSAPI
  718. (rsapi-ready? [_ graph-uuid] (and (= graph-uuid graph-uuid') private-key' public-key'))
  719. (<key-gen [_] (go (js->clj (<! (p->c (ipc/ipc "key-gen")))
  720. :keywordize-keys true)))
  721. (<set-env [_ graph-uuid prod? private-key public-key]
  722. (when (not-empty private-key)
  723. (print (util/format "[%s] setting sync age-encryption passphrase..." graph-uuid)))
  724. (set! graph-uuid' graph-uuid)
  725. (set! private-key' private-key)
  726. (set! public-key' public-key)
  727. (p->c (ipc/ipc "set-env" graph-uuid (if prod? "prod" "dev") private-key public-key)))
  728. (<get-local-all-files-meta [this graph-uuid base-path]
  729. (go
  730. (let [r (<! (<retry-rsapi #(p->c (ipc/ipc "get-local-all-files-meta" graph-uuid base-path))))]
  731. (if (instance? ExceptionInfo r)
  732. r
  733. (<! (<build-local-file-metadatas this graph-uuid r))))))
  734. (<get-local-files-meta [this graph-uuid base-path filepaths]
  735. (go
  736. (let [r (<! (<retry-rsapi #(p->c (ipc/ipc "get-local-files-meta" graph-uuid base-path filepaths))))]
  737. (assert (not (instance? ExceptionInfo r)) "get-local-files-meta shouldn't return exception")
  738. (<! (<build-local-file-metadatas this graph-uuid r)))))
  739. (<rename-local-file [_ graph-uuid base-path from to]
  740. (<retry-rsapi #(p->c (ipc/ipc "rename-local-file" graph-uuid base-path
  741. (path-normalize from)
  742. (path-normalize to)))))
  743. (<update-local-files [this graph-uuid base-path filepaths]
  744. (println "update-local-files" graph-uuid base-path filepaths)
  745. (go
  746. (<! (<rsapi-cancel-all-requests))
  747. (let [token (<! (<get-token this))]
  748. (<! (p->c (ipc/ipc "update-local-files" graph-uuid base-path filepaths token))))))
  749. (<download-version-files [this graph-uuid base-path filepaths]
  750. (go
  751. (let [token (<! (<get-token this))
  752. r (<! (<retry-rsapi
  753. #(p->c (ipc/ipc "download-version-files" graph-uuid base-path filepaths token))))]
  754. r)))
  755. (<delete-local-files [_ graph-uuid base-path filepaths]
  756. (let [normalized-filepaths (mapv path-normalize filepaths)]
  757. (go
  758. (println "delete-local-files" filepaths)
  759. (let [r (<! (<retry-rsapi #(p->c (ipc/ipc "delete-local-files" graph-uuid base-path normalized-filepaths))))]
  760. r))))
  761. (<update-remote-files [this graph-uuid base-path filepaths local-txid]
  762. (let [normalized-filepaths (mapv path-normalize filepaths)]
  763. (go
  764. (<! (<rsapi-cancel-all-requests))
  765. (let [token (<! (<get-token this))]
  766. (<! (<retry-rsapi
  767. #(p->c (ipc/ipc "update-remote-files" graph-uuid base-path normalized-filepaths local-txid token))))))))
  768. (<delete-remote-files [this graph-uuid base-path filepaths local-txid]
  769. (let [normalized-filepaths (mapv path-normalize filepaths)]
  770. (go
  771. (let [token (<! (<get-token this))]
  772. (<!
  773. (<retry-rsapi
  774. #(p->c (ipc/ipc "delete-remote-files" graph-uuid base-path normalized-filepaths local-txid token))))))))
  775. (<encrypt-fnames [_ graph-uuid fnames] (go (js->clj (<! (p->c (ipc/ipc "encrypt-fnames" graph-uuid fnames))))))
  776. (<decrypt-fnames [_ graph-uuid fnames] (go
  777. (let [r (<! (p->c (ipc/ipc "decrypt-fnames" graph-uuid fnames)))]
  778. (if (instance? ExceptionInfo r)
  779. (ex-info "decrypt-failed" {:fnames fnames} (ex-cause r))
  780. (js->clj r)))))
  781. (<cancel-all-requests [_]
  782. (p->c (ipc/ipc "cancel-all-requests")))
  783. (<add-new-version [_this repo path content]
  784. (p->c (ipc/ipc "addVersionFile" (config/get-local-dir repo) path content))))
  785. (deftype ^:large-vars/cleanup-todo CapacitorAPI [^:mutable graph-uuid' ^:mutable private-key ^:mutable public-key']
  786. IToken
  787. (<get-token [_this]
  788. (user/<wrap-ensure-id&access-token
  789. (state/get-auth-id-token)))
  790. IRSAPI
  791. (rsapi-ready? [_ graph-uuid] (and (= graph-uuid graph-uuid') private-key public-key'))
  792. (<key-gen [_]
  793. (go (let [r (<! (p->c (.keygen mobile-util/file-sync #js {})))]
  794. (-> r
  795. (js->clj :keywordize-keys true)))))
  796. (<set-env [_ graph-uuid prod? secret-key public-key]
  797. (set! graph-uuid' graph-uuid)
  798. (set! private-key secret-key)
  799. (set! public-key' public-key)
  800. (p->c (.setEnv mobile-util/file-sync (clj->js {:graphUUID graph-uuid
  801. :env (if prod? "prod" "dev")
  802. :secretKey secret-key
  803. :publicKey public-key}))))
  804. (<get-local-all-files-meta [this graph-uuid base-path]
  805. (go
  806. (let [r (<! (p->c (.getLocalAllFilesMeta mobile-util/file-sync (clj->js {:graphUUID graph-uuid
  807. :basePath base-path}))))]
  808. (if (instance? ExceptionInfo r)
  809. r
  810. (<! (<build-local-file-metadatas this graph-uuid (.-result r)))))))
  811. (<get-local-files-meta [this graph-uuid base-path filepaths]
  812. (go
  813. (let [r (<! (p->c (.getLocalFilesMeta mobile-util/file-sync
  814. (clj->js {:graphUUID graph-uuid
  815. :basePath base-path
  816. :filePaths filepaths}))))]
  817. (assert (not (instance? ExceptionInfo r)) "get-local-files-meta shouldn't return exception")
  818. (<! (<build-local-file-metadatas this graph-uuid (.-result r))))))
  819. (<rename-local-file [_ graph-uuid base-path from to]
  820. (p->c (.renameLocalFile mobile-util/file-sync
  821. (clj->js {:graphUUID graph-uuid
  822. :basePath base-path
  823. :from (path-normalize from)
  824. :to (path-normalize to)}))))
  825. (<update-local-files [this graph-uuid base-path filepaths]
  826. (go
  827. (let [token (<! (<get-token this))
  828. filepaths' (map path-normalize filepaths)]
  829. (<! (p->c (.updateLocalFiles mobile-util/file-sync (clj->js {:graphUUID graph-uuid
  830. :basePath base-path
  831. :filePaths filepaths'
  832. :token token})))))))
  833. (<download-version-files [this graph-uuid base-path filepaths]
  834. (go
  835. (let [token (<! (<get-token this))
  836. r (<! (<retry-rsapi
  837. #(p->c (.updateLocalVersionFiles mobile-util/file-sync
  838. (clj->js {:graphUUID graph-uuid
  839. :basePath base-path
  840. :filePaths filepaths
  841. :token token})))))]
  842. r)))
  843. (<delete-local-files [_ graph-uuid base-path filepaths]
  844. (let [normalized-filepaths (mapv path-normalize filepaths)]
  845. (go
  846. (let [r (<! (<retry-rsapi #(p->c (.deleteLocalFiles mobile-util/file-sync
  847. (clj->js {:graphUUID graph-uuid
  848. :basePath base-path
  849. :filePaths normalized-filepaths})))))]
  850. r))))
  851. (<update-remote-files [this graph-uuid base-path filepaths local-txid]
  852. (let [normalized-filepaths (mapv path-normalize filepaths)]
  853. (go
  854. (let [token (<! (<get-token this))
  855. r (<! (p->c (.updateRemoteFiles mobile-util/file-sync
  856. (clj->js {:graphUUID graph-uuid
  857. :basePath base-path
  858. :filePaths normalized-filepaths
  859. :txid local-txid
  860. :token token
  861. :fnameEncryption true}))))]
  862. (if (instance? ExceptionInfo r)
  863. r
  864. (get (js->clj r) "txid"))))))
  865. (<delete-remote-files [this graph-uuid _base-path filepaths local-txid]
  866. (let [normalized-filepaths (mapv path-normalize filepaths)]
  867. (go
  868. (let [token (<! (<get-token this))
  869. r (<! (p->c (.deleteRemoteFiles mobile-util/file-sync
  870. (clj->js {:graphUUID graph-uuid
  871. :filePaths normalized-filepaths
  872. :txid local-txid
  873. :token token}))))]
  874. (if (instance? ExceptionInfo r)
  875. r
  876. (get (js->clj r) "txid"))))))
  877. (<encrypt-fnames [_ graph-uuid fnames]
  878. (go
  879. (let [r (<! (p->c (.encryptFnames mobile-util/file-sync
  880. (clj->js {:graphUUID graph-uuid
  881. :filePaths fnames}))))]
  882. (if (instance? ExceptionInfo r)
  883. (.-cause r)
  884. (get (js->clj r) "value")))))
  885. (<decrypt-fnames [_ graph-uuid fnames]
  886. (go (let [r (<! (p->c (.decryptFnames mobile-util/file-sync
  887. (clj->js {:graphUUID graph-uuid
  888. :filePaths fnames}))))]
  889. (if (instance? ExceptionInfo r)
  890. (ex-info "decrypt-failed" {:fnames fnames} (ex-cause r))
  891. (get (js->clj r) "value")))))
  892. (<cancel-all-requests [_]
  893. (p->c (.cancelAllRequests mobile-util/file-sync)))
  894. (<add-new-version [_this repo path content]
  895. (p->c (capacitor-fs/backup-file repo :version-file-dir path content))))
  896. (def rsapi (cond
  897. (util/electron?)
  898. (->RSAPI nil nil nil)
  899. (mobile-util/native-ios?)
  900. (->CapacitorAPI nil nil nil)
  901. (mobile-util/native-android?)
  902. (->CapacitorAPI nil nil nil)
  903. :else
  904. nil))
  905. (defn add-new-version-file
  906. [repo path content]
  907. (<add-new-version rsapi repo path content))
  908. (defn <rsapi-cancel-all-requests []
  909. (go
  910. (when rsapi
  911. (<! (<cancel-all-requests rsapi)))))
  912. ;;; ### remote & rs api exceptions
  913. (defn sync-stop-when-api-flying?
  914. [exp]
  915. (some-> (ex-data exp) :err (= :stop)))
  916. (defn storage-exceed-limit?
  917. [exp]
  918. (some->> (ex-data exp)
  919. :err
  920. ((juxt :status (comp :message :body)))
  921. ((fn [[status msg]] (and (= 403 status) (= msg "storage-limit"))))))
  922. (defn graph-count-exceed-limit?
  923. [exp]
  924. (some->> (ex-data exp)
  925. :err
  926. ((juxt :status (comp :message :body)))
  927. ((fn [[status msg]] (and (= 403 status) (= msg "graph-count-exceed-limit"))))))
  928. (defn decrypt-exp?
  929. [exp]
  930. (some-> exp ex-message #(= % "decrypt-failed")))
  931. ;;; remote api exceptions ends
  932. ;;; ### sync events
  933. (defn- put-sync-event!
  934. [val]
  935. (async/put! pubsub/sync-events-ch val))
  936. (def ^:private debug-print-sync-events-loop-stop-chan (chan 1))
  937. (defn debug-print-sync-events-loop
  938. ([] (debug-print-sync-events-loop [:created-local-version-file
  939. :finished-local->remote
  940. :finished-remote->local
  941. :pause
  942. :resume
  943. :exception-decrypt-failed
  944. :remote->local-full-sync-failed
  945. :local->remote-full-sync-failed]))
  946. ([topics]
  947. (util/drain-chan debug-print-sync-events-loop-stop-chan)
  948. (let [topic&chs (map (juxt identity #(chan 10)) topics)
  949. out-ch (chan 10)
  950. out-mix (async/mix out-ch)]
  951. (doseq [[topic ch] topic&chs]
  952. (async/sub pubsub/sync-events-pub topic ch)
  953. (async/admix out-mix ch))
  954. (go-loop []
  955. (let [{:keys [val stop]}
  956. (async/alt!
  957. debug-print-sync-events-loop-stop-chan {:stop true}
  958. out-ch ([v] {:val v}))]
  959. (cond
  960. stop (do (async/unmix-all out-mix)
  961. (doseq [[topic ch] topic&chs]
  962. (async/unsub pubsub/sync-events-pub topic ch)))
  963. val (do (pp/pprint [:debug :sync-event val])
  964. (recur))))))))
  965. (defn stop-debug-print-sync-events-loop
  966. []
  967. (offer! debug-print-sync-events-loop-stop-chan true))
  968. ;;; sync events ends
  969. (defn- fire-file-sync-storage-exceed-limit-event!
  970. [exp]
  971. (when (storage-exceed-limit? exp)
  972. (state/pub-event! [:file-sync/storage-exceed-limit])
  973. true))
  974. (defn- fire-file-sync-graph-count-exceed-limit-event!
  975. [exp]
  976. (when (graph-count-exceed-limit? exp)
  977. (state/pub-event! [:file-sync/graph-count-exceed-limit])
  978. true))
  979. (deftype RemoteAPI [*stopped?]
  980. Object
  981. (<request [this api-name body]
  982. (go
  983. (let [resp (<! (<request api-name body (<! (<get-token this)) *stopped?))]
  984. (if (http/unexceptional-status? (:status resp))
  985. (get-resp-json-body resp)
  986. (let [exp (ex-info "request failed"
  987. {:err resp
  988. :body (:body resp)
  989. :api-name api-name
  990. :request-body body})]
  991. (fire-file-sync-storage-exceed-limit-event! exp)
  992. (fire-file-sync-graph-count-exceed-limit-event! exp)
  993. exp)))))
  994. ;; for test
  995. (update-files [this graph-uuid txid files]
  996. {:pre [(map? files)
  997. (number? txid)]}
  998. (.<request this "update_files" {:GraphUUID graph-uuid :TXId txid :Files files}))
  999. IToken
  1000. (<get-token [_this]
  1001. (user/<wrap-ensure-id&access-token
  1002. (state/get-auth-id-token))))
  1003. (defn- filter-files-with-unnormalized-path
  1004. [file-meta-list encrypted-path->path-map]
  1005. (let [path->encrypted-path-map (set/map-invert encrypted-path->path-map)
  1006. raw-paths (vals encrypted-path->path-map)
  1007. *encrypted-paths-to-drop (transient [])]
  1008. (loop [[raw-path & other-paths] raw-paths]
  1009. (when raw-path
  1010. (let [normalized-path (path-normalize raw-path)]
  1011. (when (not= normalized-path raw-path)
  1012. (println :filter-files-with-unnormalized-path raw-path)
  1013. (conj! *encrypted-paths-to-drop (get path->encrypted-path-map raw-path))))
  1014. (recur other-paths)))
  1015. (let [encrypted-paths-to-drop (set (persistent! *encrypted-paths-to-drop))]
  1016. (filterv #(not (contains? encrypted-paths-to-drop (:encrypted-path %))) file-meta-list))))
  1017. (defn- filter-case-different-same-files
  1018. "filter case-different-but-same-name files, last-modified one wins"
  1019. [file-meta-list encrypted-path->path-map]
  1020. (let [seen (volatile! {})]
  1021. (loop [result-file-meta-list (transient {})
  1022. [f & others] file-meta-list]
  1023. (if f
  1024. (let [origin-path (get encrypted-path->path-map (:encrypted-path f))
  1025. _ (assert (some? origin-path) f)
  1026. path (string/lower-case origin-path)
  1027. last-modified (:last-modified f)
  1028. last-modified-seen (get @seen path)]
  1029. (cond
  1030. (or (and path (nil? last-modified-seen))
  1031. (and path (some? last-modified-seen) (> last-modified last-modified-seen)))
  1032. ;; 1. not found in seen
  1033. ;; 2. found in seen, but current f wins
  1034. (do (vswap! seen conj [path last-modified])
  1035. (recur (conj! result-file-meta-list [path f]) others))
  1036. (and path (some? last-modified-seen) (<= last-modified last-modified-seen))
  1037. ;; found in seen, and seen-f has more recent last-modified epoch
  1038. (recur result-file-meta-list others)
  1039. :else
  1040. (do (println :debug-filter-case-different-same-files:unreachable f path)
  1041. (recur result-file-meta-list others))))
  1042. (vals (persistent! result-file-meta-list))))))
  1043. (extend-type RemoteAPI
  1044. IRemoteAPI
  1045. (<user-info [this]
  1046. (user/<wrap-ensure-id&access-token
  1047. (<! (.<request this "user_info" {}))))
  1048. (<get-remote-all-files-meta [this graph-uuid]
  1049. (user/<wrap-ensure-id&access-token
  1050. (let [file-meta-list (transient #{})
  1051. encrypted-path-list (transient [])
  1052. exp-r
  1053. (<!
  1054. (go-loop [continuation-token nil]
  1055. (let [r (<! (.<request this "get_all_files"
  1056. (into
  1057. {}
  1058. (remove (comp nil? second)
  1059. {:GraphUUID graph-uuid :ContinuationToken continuation-token}))))]
  1060. (if (instance? ExceptionInfo r)
  1061. r
  1062. (let [next-continuation-token (:NextContinuationToken r)
  1063. objs (:Objects r)]
  1064. (apply conj! encrypted-path-list (map (comp remove-user-graph-uuid-prefix :Key) objs))
  1065. (apply conj! file-meta-list
  1066. (map
  1067. #(hash-map :checksum (:checksum %)
  1068. :encrypted-path (remove-user-graph-uuid-prefix (:Key %))
  1069. :size (:Size %)
  1070. :last-modified (:LastModified %)
  1071. :txid (:Txid %))
  1072. objs))
  1073. (when-not (empty? next-continuation-token)
  1074. (recur next-continuation-token)))))))]
  1075. (if (instance? ExceptionInfo exp-r)
  1076. exp-r
  1077. (let [file-meta-list* (persistent! file-meta-list)
  1078. encrypted-path-list* (persistent! encrypted-path-list)
  1079. path-list-or-exp (<! (<decrypt-fnames rsapi graph-uuid encrypted-path-list*))]
  1080. (if (instance? ExceptionInfo path-list-or-exp)
  1081. path-list-or-exp
  1082. (let [encrypted-path->path-map (zipmap encrypted-path-list* path-list-or-exp)]
  1083. (set
  1084. (mapv
  1085. #(->FileMetadata (:size %)
  1086. (:checksum %)
  1087. (get encrypted-path->path-map (:encrypted-path %))
  1088. (:encrypted-path %)
  1089. (:last-modified %)
  1090. true
  1091. (:txid %)
  1092. nil)
  1093. (-> file-meta-list*
  1094. (filter-files-with-unnormalized-path encrypted-path->path-map)
  1095. (filter-case-different-same-files encrypted-path->path-map)))))))))))
  1096. (<get-remote-files-meta [this graph-uuid filepaths]
  1097. {:pre [(coll? filepaths)]}
  1098. (user/<wrap-ensure-id&access-token
  1099. (let [encrypted-paths* (<! (<encrypt-fnames rsapi graph-uuid filepaths))
  1100. r (<! (.<request this "get_files_meta" {:GraphUUID graph-uuid :Files encrypted-paths*}))]
  1101. (if (instance? ExceptionInfo r)
  1102. r
  1103. (let [encrypted-paths (mapv :FilePath r)
  1104. paths-or-exp (<! (<decrypt-fnames rsapi graph-uuid encrypted-paths))]
  1105. (if (instance? ExceptionInfo paths-or-exp)
  1106. paths-or-exp
  1107. (let [encrypted-path->path-map (zipmap encrypted-paths paths-or-exp)]
  1108. (into #{}
  1109. (comp
  1110. (filter #(not= "filepath too long" (:Error %)))
  1111. (map #(->FileMetadata (:Size %)
  1112. (:Checksum %)
  1113. (some->> (get encrypted-path->path-map (:FilePath %))
  1114. path-normalize)
  1115. (:FilePath %)
  1116. (:LastModified %)
  1117. true
  1118. (:Txid %)
  1119. nil)))
  1120. r))))))))
  1121. (<get-remote-graph [this graph-name-opt graph-uuid-opt]
  1122. {:pre [(or graph-name-opt graph-uuid-opt)]}
  1123. (user/<wrap-ensure-id&access-token
  1124. (<! (.<request this "get_graph" (cond-> {}
  1125. (seq graph-name-opt)
  1126. (assoc :GraphName graph-name-opt)
  1127. (seq graph-uuid-opt)
  1128. (assoc :GraphUUID graph-uuid-opt))))))
  1129. (<get-remote-file-versions [this graph-uuid filepath]
  1130. (user/<wrap-ensure-id&access-token
  1131. (let [encrypted-path (first (<! (<encrypt-fnames rsapi graph-uuid [filepath])))]
  1132. (<! (.<request this "get_file_version_list" {:GraphUUID graph-uuid :File encrypted-path})))))
  1133. (<list-remote-graphs [this]
  1134. (user/<wrap-ensure-id&access-token
  1135. (<! (.<request this "list_graphs"))))
  1136. (<get-deletion-logs [this graph-uuid from-txid]
  1137. (user/<wrap-ensure-id&access-token
  1138. (let [r (<! (.<request this "get_deletion_log_v20221212" {:GraphUUID graph-uuid :FromTXId from-txid}))]
  1139. (if (instance? ExceptionInfo r)
  1140. r
  1141. (let [txns-with-encrypted-paths (mapv (fn [txn]
  1142. (assoc txn :paths
  1143. (mapv remove-user-graph-uuid-prefix (:paths txn))))
  1144. (:Transactions r))
  1145. encrypted-paths (mapcat :paths txns-with-encrypted-paths)
  1146. encrypted-path->path-map
  1147. (zipmap
  1148. encrypted-paths
  1149. (<! (<decrypt-fnames rsapi graph-uuid encrypted-paths)))
  1150. txns
  1151. (mapv
  1152. (fn [txn]
  1153. (assoc txn :paths (mapv #(get encrypted-path->path-map %) (:paths txn))))
  1154. txns-with-encrypted-paths)]
  1155. txns)))))
  1156. (<get-diff [this graph-uuid from-txid]
  1157. ;; TODO: path in transactions should be relative path(now s3 key, which includes graph-uuid and user-uuid)
  1158. (user/<wrap-ensure-id&access-token
  1159. (let [r (<! (.<request this "get_diff" {:GraphUUID graph-uuid :FromTXId from-txid}))]
  1160. (if (instance? ExceptionInfo r)
  1161. r
  1162. (let [txns-with-encrypted-paths (sort-by :TXId (:Transactions r))
  1163. txns-with-encrypted-paths*
  1164. (mapv
  1165. (fn [txn]
  1166. (assoc txn :TXContent
  1167. (mapv
  1168. (fn [[to-path from-path checksum]]
  1169. [(remove-user-graph-uuid-prefix to-path)
  1170. (some-> from-path remove-user-graph-uuid-prefix)
  1171. checksum])
  1172. (:TXContent txn))))
  1173. txns-with-encrypted-paths)
  1174. encrypted-paths
  1175. (mapcat
  1176. (fn [txn]
  1177. (remove
  1178. #(or (nil? %) (not (string/starts-with? % "e.")))
  1179. (mapcat
  1180. (fn [[to-path from-path _checksum]] [to-path from-path])
  1181. (:TXContent txn))))
  1182. txns-with-encrypted-paths*)
  1183. encrypted-path->path-map
  1184. (zipmap
  1185. encrypted-paths
  1186. (<! (<decrypt-fnames rsapi graph-uuid encrypted-paths)))
  1187. txns
  1188. (mapv
  1189. (fn [txn]
  1190. (assoc
  1191. txn :TXContent
  1192. (mapv
  1193. (fn [[to-path from-path checksum]]
  1194. [(get encrypted-path->path-map to-path to-path)
  1195. (some->> from-path (get encrypted-path->path-map))
  1196. checksum])
  1197. (:TXContent txn))))
  1198. txns-with-encrypted-paths*)]
  1199. [txns
  1200. (:TXId (last txns))
  1201. (:TXId (first txns))])))))
  1202. (<create-graph [this graph-name]
  1203. (user/<wrap-ensure-id&access-token
  1204. (<! (.<request this "create_graph" {:GraphName graph-name}))))
  1205. (<delete-graph [this graph-uuid]
  1206. (user/<wrap-ensure-id&access-token
  1207. (<! (.<request this "delete_graph" {:GraphUUID graph-uuid}))))
  1208. (<get-graph-salt [this graph-uuid]
  1209. (user/<wrap-ensure-id&access-token
  1210. (<! (.<request this "get_graph_salt" {:GraphUUID graph-uuid}))))
  1211. (<create-graph-salt [this graph-uuid]
  1212. (user/<wrap-ensure-id&access-token
  1213. (<! (.<request this "create_graph_salt" {:GraphUUID graph-uuid}))))
  1214. (<get-graph-encrypt-keys [this graph-uuid]
  1215. (user/<wrap-ensure-id&access-token
  1216. (<! (.<request this "get_graph_encrypt_keys" {:GraphUUID graph-uuid}))))
  1217. (<upload-graph-encrypt-keys [this graph-uuid public-key encrypted-private-key]
  1218. (user/<wrap-ensure-id&access-token
  1219. (<! (.<request this "upload_graph_encrypt_keys" {:GraphUUID graph-uuid
  1220. :public-key public-key
  1221. :encrypted-private-key encrypted-private-key})))))
  1222. (extend-type RemoteAPI
  1223. IRemoteControlAPI
  1224. (<delete-remote-files-control [this graph-uuid filepaths]
  1225. (user/<wrap-ensure-id&access-token
  1226. (let [partitioned-files (partition-all 20 (<! (<encrypt-fnames rsapi graph-uuid filepaths)))]
  1227. (loop [[files & others] partitioned-files]
  1228. (when files
  1229. (let [current-txid (:TXId (<! (<get-remote-graph this nil graph-uuid)))]
  1230. (<! (.<request this "delete_files" {:GraphUUID graph-uuid :TXId current-txid :Files files}))
  1231. (recur others))))))))
  1232. (comment
  1233. (declare remoteapi)
  1234. (<delete-remote-files-control remoteapi (second @graphs-txid) ["pages/aa.md"])
  1235. )
  1236. (def remoteapi (->RemoteAPI nil))
  1237. (def ^:private *get-graph-salt-memoize-cache (atom {}))
  1238. (defn update-graph-salt-cache [graph-uuid v]
  1239. {:pre [(map? v)
  1240. (= #{:value :expired-at} (set (keys v)))]}
  1241. (swap! *get-graph-salt-memoize-cache conj [graph-uuid v]))
  1242. (defn <get-graph-salt-memoize [remoteapi graph-uuid]
  1243. (go
  1244. (let [r (get @*get-graph-salt-memoize-cache graph-uuid)
  1245. expired-at (:expired-at r)
  1246. now (tc/to-long (t/now))]
  1247. (if (< now expired-at)
  1248. r
  1249. (let [r (<! (<get-graph-salt remoteapi graph-uuid))]
  1250. (swap! *get-graph-salt-memoize-cache conj [graph-uuid r])
  1251. r)))))
  1252. (def ^:private *get-graph-encrypt-keys-memoize-cache (atom {}))
  1253. (defn update-graph-encrypt-keys-cache [graph-uuid v]
  1254. {:pre [(map? v)
  1255. (= #{:public-key :encrypted-private-key} (set (keys v)))]}
  1256. (swap! *get-graph-encrypt-keys-memoize-cache conj [graph-uuid v]))
  1257. (defn <get-graph-encrypt-keys-memoize [remoteapi graph-uuid]
  1258. (go
  1259. (or (get @*get-graph-encrypt-keys-memoize-cache graph-uuid)
  1260. (let [{:keys [public-key encrypted-private-key] :as r}
  1261. (<! (<get-graph-encrypt-keys remoteapi graph-uuid))]
  1262. (when (and public-key encrypted-private-key)
  1263. (swap! *get-graph-encrypt-keys-memoize-cache conj [graph-uuid r]))
  1264. r))))
  1265. (defn- is-journals-or-pages?
  1266. [filetxn]
  1267. (let [rel-path (relative-path filetxn)]
  1268. (or (string/starts-with? rel-path (path/join (config/get-journals-directory) path/sep))
  1269. (string/starts-with? rel-path (path/join (config/get-pages-directory) path/sep)))))
  1270. (defn- need-add-version-file?
  1271. "when we need to create a new version file:
  1272. 1. when apply a 'update' filetxn, it already exists(same page name) locally and has delete diffs
  1273. 2. when apply a 'delete' filetxn, its origin remote content and local content are different
  1274. - TODO: we need to store origin remote content md5 in server db
  1275. 3. create version files only for files under 'journals/', 'pages/' dir"
  1276. [^FileTxn filetxn origin-db-content]
  1277. (go
  1278. (cond
  1279. (.renamed? filetxn)
  1280. false
  1281. (.-deleted? filetxn)
  1282. false
  1283. (.-updated? filetxn)
  1284. (let [path (relative-path filetxn)
  1285. repo (state/get-current-repo)
  1286. file-path (config/get-file-path repo path)
  1287. content (<! (p->c (fs/read-file "" file-path)))]
  1288. (and (seq origin-db-content)
  1289. (or (nil? content)
  1290. (some :removed (diff/diff origin-db-content content))))))))
  1291. (defn- <with-pause
  1292. [ch *paused]
  1293. (go-loop []
  1294. (if @*paused
  1295. {:pause true}
  1296. (let [{:keys [timeout val]}
  1297. (async/alt! ch ([v] {:val v})
  1298. (timeout 1000) {:timeout true})]
  1299. (cond
  1300. val val
  1301. timeout (recur))))))
  1302. (defn- assert-local-txid<=remote-txid
  1303. []
  1304. (when-let [local-txid (last @graphs-txid)]
  1305. (go (let [remote-txid (:TXId (<! (<get-remote-graph remoteapi nil (second @graphs-txid))))]
  1306. (assert (<= local-txid remote-txid)
  1307. [@graphs-txid local-txid remote-txid])))))
  1308. (defn- get-local-files-checksum
  1309. [graph-uuid base-path relative-paths]
  1310. (go
  1311. (into {}
  1312. (map (juxt #(.-path ^FileMetadata %) #(.-etag ^FileMetadata %)))
  1313. (<! (<get-local-files-meta rsapi graph-uuid base-path relative-paths)))))
  1314. (declare sync-state--add-current-local->remote-files
  1315. sync-state--add-current-remote->local-files
  1316. sync-state--remove-current-local->remote-files
  1317. sync-state--remove-current-remote->local-files
  1318. sync-state--add-recent-remote->local-files
  1319. sync-state--remove-recent-remote->local-files
  1320. sync-state--stopped?)
  1321. (defn- filetxns=>recent-remote->local-files
  1322. [filetxns]
  1323. (let [{:keys [update-filetxns delete-filetxns rename-filetxns]}
  1324. (group-by (fn [^FileTxn e]
  1325. (cond
  1326. (.-updated? e) :update-filetxns
  1327. (.-deleted? e) :delete-filetxns
  1328. (.renamed? e) :rename-filetxns)) filetxns)
  1329. update-file-items (map
  1330. (fn [filetxn]
  1331. (let [path (relative-path filetxn)]
  1332. {:remote->local-type :update
  1333. :checksum (-checksum filetxn)
  1334. :path path}))
  1335. update-filetxns)
  1336. rename-file-items (mapcat
  1337. (fn [^FileTxn filetxn]
  1338. (let [to-path (relative-path filetxn)
  1339. from-path (.-from-path filetxn)]
  1340. [{:remote->local-type :update
  1341. :checksum (-checksum filetxn)
  1342. :path to-path}
  1343. {:remote->local-type :delete
  1344. :checksum nil
  1345. :path from-path}]))
  1346. rename-filetxns)
  1347. delete-file-items (map
  1348. (fn [filetxn]
  1349. (let [path (relative-path filetxn)]
  1350. {:remote->local-type :delete
  1351. :checksum (-checksum filetxn)
  1352. :path path}))
  1353. delete-filetxns)]
  1354. (set (concat update-file-items rename-file-items delete-file-items))))
  1355. (defn- apply-filetxns
  1356. [*sync-state graph-uuid base-path filetxns *paused]
  1357. (go
  1358. (cond
  1359. (.renamed? (first filetxns))
  1360. (let [^FileTxn filetxn (first filetxns)
  1361. from-path (.-from-path filetxn)
  1362. to-path (.-to-path filetxn)]
  1363. (assert (= 1 (count filetxns)))
  1364. (<! (<rename-local-file rsapi graph-uuid base-path
  1365. (relative-path from-path)
  1366. (relative-path to-path))))
  1367. (.-updated? (first filetxns))
  1368. (let [repo (state/get-current-repo)
  1369. txn->db-content-vec (->> filetxns
  1370. (mapv
  1371. #(when (is-journals-or-pages? %)
  1372. [% (db/get-file repo (config/get-file-path repo (relative-path %)))]))
  1373. (remove nil?))]
  1374. (doseq [relative-p (map relative-path filetxns)]
  1375. (when-some [relative-p*
  1376. (<! (<case-different-local-file-exist? graph-uuid rsapi base-path relative-p))]
  1377. (let [recent-remote->local-file-item {:remote->local-type :delete
  1378. :checksum nil
  1379. :path relative-p*}]
  1380. (println :debug "found case-different-same-local-file" relative-p relative-p*)
  1381. (swap! *sync-state sync-state--add-recent-remote->local-files
  1382. [recent-remote->local-file-item])
  1383. (<! (<delete-local-files rsapi graph-uuid base-path [relative-p*]))
  1384. (go (<! (timeout 5000))
  1385. (swap! *sync-state sync-state--remove-recent-remote->local-files
  1386. [recent-remote->local-file-item])))))
  1387. (let [update-local-files-ch (<update-local-files rsapi graph-uuid base-path (map relative-path filetxns))
  1388. r (<! (<with-pause update-local-files-ch *paused))]
  1389. (doseq [[filetxn origin-db-content] txn->db-content-vec]
  1390. (when (<! (need-add-version-file? filetxn origin-db-content))
  1391. (<! (<add-new-version rsapi repo (relative-path filetxn) origin-db-content))
  1392. (put-sync-event! {:event :created-local-version-file
  1393. :data {:graph-uuid graph-uuid
  1394. :repo repo
  1395. :path (relative-path filetxn)
  1396. :epoch (tc/to-epoch (t/now))}})))
  1397. r))
  1398. (.-deleted? (first filetxns))
  1399. (let [filetxn (first filetxns)]
  1400. (assert (= 1 (count filetxns)))
  1401. (if (<! (<local-file-not-exist? graph-uuid rsapi base-path (relative-path filetxn)))
  1402. ;; not exist, ignore
  1403. true
  1404. (let [r (<! (<delete-local-files rsapi graph-uuid base-path [(relative-path filetxn)]))]
  1405. (if (and (instance? ExceptionInfo r)
  1406. (string/index-of (str (ex-cause r)) "No such file or directory"))
  1407. true
  1408. r)))))))
  1409. (declare sync-state-reset-full-remote->local-files)
  1410. (defn apply-filetxns-partitions
  1411. "won't call <update-graphs-txid! when *txid is nil"
  1412. [*sync-state user-uuid graph-uuid base-path filetxns-partitions repo *txid *stopped *paused full-sync?]
  1413. (assert (some? *sync-state))
  1414. (go-loop [filetxns-partitions* filetxns-partitions]
  1415. (cond
  1416. @*stopped {:stop true}
  1417. @*paused {:pause true}
  1418. :else
  1419. (when (seq filetxns-partitions*)
  1420. (let [filetxns (first filetxns-partitions*)
  1421. paths (map relative-path filetxns)
  1422. recent-remote->local-file-items (filetxns=>recent-remote->local-files filetxns)
  1423. _ (when-not full-sync?
  1424. (swap! *sync-state #(sync-state-reset-full-remote->local-files % recent-remote->local-file-items)))
  1425. ;; update recent-remote->local-files
  1426. _ (swap! *sync-state sync-state--add-recent-remote->local-files
  1427. recent-remote->local-file-items)
  1428. _ (swap! *sync-state sync-state--add-current-remote->local-files paths)
  1429. r (<! (apply-filetxns *sync-state graph-uuid base-path filetxns *paused))
  1430. _ (swap! *sync-state sync-state--remove-current-remote->local-files paths
  1431. (not (instance? ExceptionInfo r)))]
  1432. ;; remove these recent-remote->local-file-items 5s later
  1433. (go (<! (timeout 5000))
  1434. (swap! *sync-state sync-state--remove-recent-remote->local-files
  1435. recent-remote->local-file-items))
  1436. (cond
  1437. (instance? ExceptionInfo r) r
  1438. @*paused {:pause true}
  1439. :else
  1440. (let [latest-txid (apply max (and *txid @*txid) (map #(.-txid ^FileTxn %) filetxns))]
  1441. ;; update local-txid
  1442. (when (and *txid (number? latest-txid))
  1443. (reset! *txid latest-txid)
  1444. (<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo)))
  1445. (recur (next filetxns-partitions*)))))))))
  1446. (defmulti need-sync-remote? (fn [v] (cond
  1447. (= :max v)
  1448. :max
  1449. (and (vector? v) (number? (first v)))
  1450. :txid
  1451. (instance? ExceptionInfo v)
  1452. :exceptional-response
  1453. (instance? cljs.core.async.impl.channels/ManyToManyChannel v)
  1454. :chan)))
  1455. (defmethod need-sync-remote? :max [_] true)
  1456. (defmethod need-sync-remote? :txid [[txid remote->local-syncer]]
  1457. (let [remote-txid txid
  1458. local-txid (.-txid remote->local-syncer)]
  1459. (or (nil? local-txid)
  1460. (> remote-txid local-txid))))
  1461. (defmethod need-sync-remote? :exceptional-response [resp]
  1462. (let [data (ex-data resp)
  1463. cause (ex-cause resp)]
  1464. (or
  1465. (and (= (:error data) :promise-error)
  1466. (when-let [r (re-find #"(\d+), txid_to_validate = (\d+)" (str cause))]
  1467. (> (nth r 1) (nth r 2))))
  1468. (= 409 (get-in data [:err :status])))))
  1469. (defmethod need-sync-remote? :chan [c]
  1470. (go (need-sync-remote? (<! c))))
  1471. (defmethod need-sync-remote? :default [_] false)
  1472. (defn- need-reset-local-txid?
  1473. [r]
  1474. (when-let [cause (ex-cause r)]
  1475. (when-let [r (re-find #"(\d+), txid_to_validate = (\d+)" (str cause))]
  1476. (< (nth r 1) (nth r 2)))))
  1477. (defn- graph-has-been-deleted?
  1478. [r]
  1479. (some->> (ex-cause r) str (re-find #"graph-not-exist")))
  1480. ;; type = "change" | "add" | "unlink"
  1481. (deftype FileChangeEvent [type dir path stat checksum]
  1482. IRelativePath
  1483. (-relative-path [_] (remove-dir-prefix dir path))
  1484. IEquiv
  1485. (-equiv [_ ^FileChangeEvent other]
  1486. (and (= dir (.-dir other))
  1487. (= type (.-type other))
  1488. (= path (.-path other))
  1489. (= checksum (.-checksum other))))
  1490. IHash
  1491. (-hash [_]
  1492. (hash {:dir dir
  1493. :type type
  1494. :path path
  1495. :checksum checksum}))
  1496. ILookup
  1497. (-lookup [o k] (-lookup o k nil))
  1498. (-lookup [_ k not-found]
  1499. (case k
  1500. :type type
  1501. :dir dir
  1502. :path path
  1503. :stat stat
  1504. :checksum checksum
  1505. not-found))
  1506. IPrintWithWriter
  1507. (-pr-writer [_ w _opts]
  1508. (write-all w (str {:type type :base-path dir :path path :size (:size stat) :checksum checksum}))))
  1509. (defn- <file-change-event=>recent-remote->local-file-item
  1510. "return nil when related local files not found"
  1511. [graph-uuid ^FileChangeEvent e]
  1512. (go
  1513. (let [tp (case (.-type e)
  1514. ("add" "change") :update
  1515. "unlink" :delete)
  1516. path (relative-path e)]
  1517. (when-let [path-etag-entry (first (<! (get-local-files-checksum graph-uuid (.-dir e) [path])))]
  1518. {:remote->local-type tp
  1519. :checksum (if (= tp :delete) nil
  1520. (val path-etag-entry))
  1521. :path path}))))
  1522. (defn- distinct-file-change-events-xf
  1523. "transducer.
  1524. distinct `FileChangeEvent`s by their path, keep the first one."
  1525. [rf]
  1526. (let [seen (volatile! #{})]
  1527. (fn
  1528. ([] (rf))
  1529. ([result] (rf result))
  1530. ([result ^FileChangeEvent e]
  1531. (if (contains? @seen (.-path e))
  1532. result
  1533. (do (vswap! seen conj (.-path e))
  1534. (rf result e)))))))
  1535. (defn- distinct-file-change-events
  1536. "distinct `FileChangeEvent`s by their path, keep the last one."
  1537. [es]
  1538. (transduce distinct-file-change-events-xf conj '() (reverse es)))
  1539. (defn- partition-file-change-events
  1540. "return transducer.
  1541. partition `FileChangeEvent`s, at most N file-change-events in each partition.
  1542. only one type in a partition."
  1543. [n]
  1544. (comp
  1545. (partition-by (fn [^FileChangeEvent e]
  1546. (case (.-type e)
  1547. ("add" "change") :add-or-change
  1548. "unlink" :unlink)))
  1549. (map #(partition-all n %))
  1550. cat))
  1551. (declare sync-state--valid-to-accept-filewatcher-event?)
  1552. (defonce local-changes-chan (chan (async/dropping-buffer 1000)))
  1553. (defn file-watch-handler
  1554. "file-watcher callback"
  1555. [type {:keys [dir path _content stat] :as _payload}]
  1556. (when-let [current-graph (state/get-current-repo)]
  1557. (when (string/ends-with? current-graph dir)
  1558. (when-let [sync-state (state/get-file-sync-state (state/get-current-file-sync-graph-uuid))]
  1559. (when (sync-state--valid-to-accept-filewatcher-event? sync-state)
  1560. (when (or (:mtime stat) (= type "unlink"))
  1561. (go
  1562. (let [path (path-normalize (remove-dir-prefix dir path))
  1563. files-meta (and (not= "unlink" type)
  1564. (<! (<get-local-files-meta
  1565. rsapi (:current-syncing-graph-uuid sync-state) dir [path])))
  1566. checksum (and (coll? files-meta) (some-> files-meta first :etag))]
  1567. (>! local-changes-chan (->FileChangeEvent type dir path stat checksum))))))))))
  1568. (defn local-changes-revised-chan-builder
  1569. "return chan"
  1570. [local-changes-chan rename-page-event-chan]
  1571. (let [*rename-events (atom #{})
  1572. ch (chan 1000)]
  1573. (go-loop []
  1574. (let [{:keys [rename-event local-change]}
  1575. (async/alt!
  1576. rename-page-event-chan ([v] {:rename-event v}) ;; {:repo X :old-path X :new-path}
  1577. local-changes-chan ([v] {:local-change v}))]
  1578. (cond
  1579. rename-event
  1580. (let [repo-dir (config/get-repo-dir (:repo rename-event))
  1581. remove-dir-prefix-fn #(remove-dir-prefix repo-dir %)
  1582. rename-event* (-> rename-event
  1583. (update :old-path remove-dir-prefix-fn)
  1584. (update :new-path remove-dir-prefix-fn))
  1585. k1 [:old-path (:old-path rename-event*) repo-dir]
  1586. k2 [:new-path (:new-path rename-event*) repo-dir]]
  1587. (swap! *rename-events conj k1 k2)
  1588. ;; remove rename-events after 2s
  1589. (go (<! (timeout 3000))
  1590. (swap! *rename-events disj k1 k2))
  1591. ;; add 2 simulated file-watcher events
  1592. (>! ch (->FileChangeEvent "unlink" repo-dir (:old-path rename-event*) nil nil))
  1593. (>! ch (->FileChangeEvent "add" repo-dir (:new-path rename-event*)
  1594. {:mtime (tc/to-long (t/now))
  1595. :size 1 ; add a fake size
  1596. } "fake-checksum"))
  1597. (recur))
  1598. local-change
  1599. (cond
  1600. (and (= "change" (.-type local-change))
  1601. (or (contains? @*rename-events [:old-path (.-path local-change) (.-dir local-change)])
  1602. (contains? @*rename-events [:new-path (.-path local-change) (.-dir local-change)])))
  1603. (do (println :debug "ignore" local-change)
  1604. ;; ignore
  1605. (recur))
  1606. (and (= "add" (.-type local-change))
  1607. (contains? @*rename-events [:new-path (.-path local-change) (.-dir local-change)]))
  1608. ;; ignore
  1609. (do (println :debug "ignore" local-change)
  1610. (recur))
  1611. (and (= "unlink" (.-type local-change))
  1612. (contains? @*rename-events [:old-path (.-path local-change) (.-dir local-change)]))
  1613. (do (println :debug "ignore" local-change)
  1614. (recur))
  1615. :else
  1616. (do (>! ch local-change)
  1617. (recur))))))
  1618. ch))
  1619. (defonce local-changes-revised-chan
  1620. (local-changes-revised-chan-builder local-changes-chan (state/get-file-rename-event-chan)))
  1621. ;;; ### encryption
  1622. (def pwd-map
  1623. "graph-uuid->{:pwd xxx :public-key xxx :private-key xxx}"
  1624. (atom {}))
  1625. (defonce *pwd-map-changed-chan
  1626. (atom {}))
  1627. (defn- get-graph-pwd-changed-chan
  1628. [graph-uuid]
  1629. (if-let [result (get @*pwd-map-changed-chan graph-uuid)]
  1630. result
  1631. (let [c (chan (async/sliding-buffer 1))]
  1632. (swap! *pwd-map-changed-chan assoc graph-uuid c)
  1633. c)))
  1634. (defn- <encrypt-content
  1635. [content key*]
  1636. (p->c (encrypt/encrypt-with-passphrase key* content)))
  1637. (defn- decrypt-content
  1638. [encrypted-content key*]
  1639. (go
  1640. (let [r (<! (p->c (encrypt/decrypt-with-passphrase key* encrypted-content)))]
  1641. (when-not (instance? ExceptionInfo r) r))))
  1642. (defn- local-storage-pwd-path
  1643. [graph-uuid]
  1644. (str "encrypted-pwd/" graph-uuid))
  1645. (defn- persist-pwd!
  1646. [pwd graph-uuid]
  1647. {:pre [(string? pwd)]}
  1648. (js/localStorage.setItem (local-storage-pwd-path graph-uuid) pwd))
  1649. (defn- remove-pwd!
  1650. [graph-uuid]
  1651. (js/localStorage.removeItem (local-storage-pwd-path graph-uuid)))
  1652. (defn get-pwd
  1653. [graph-uuid]
  1654. (js/localStorage.getItem (local-storage-pwd-path graph-uuid)))
  1655. (defn remove-all-pwd!
  1656. []
  1657. (doseq [k (filter #(string/starts-with? % "encrypted-pwd/") (js->clj (js-keys js/localStorage)))]
  1658. (js/localStorage.removeItem k))
  1659. (reset! pwd-map {}))
  1660. (defn encrypt+persist-pwd!
  1661. "- persist encrypted pwd at local-storage"
  1662. [pwd graph-uuid]
  1663. (go
  1664. (let [[value expired-at gone?]
  1665. ((juxt :value :expired-at #(-> % ex-data :err :status (= 410)))
  1666. (<! (<get-graph-salt-memoize remoteapi graph-uuid)))
  1667. [salt-value _expired-at]
  1668. (if gone?
  1669. (let [r (<! (<create-graph-salt remoteapi graph-uuid))]
  1670. (update-graph-salt-cache graph-uuid r)
  1671. ((juxt :value :expired-at) r))
  1672. [value expired-at])
  1673. encrypted-pwd (<! (<encrypt-content pwd salt-value))]
  1674. (persist-pwd! encrypted-pwd graph-uuid))))
  1675. (defn restore-pwd!
  1676. "restore pwd from persisted encrypted-pwd, update `pwd-map`"
  1677. [graph-uuid]
  1678. (go
  1679. (let [encrypted-pwd (get-pwd graph-uuid)]
  1680. (if (nil? encrypted-pwd)
  1681. {:restore-pwd-failed true}
  1682. (let [[salt-value _expired-at gone?]
  1683. ((juxt :value :expired-at #(-> % ex-data :err :status (= 410)))
  1684. (<! (<get-graph-salt-memoize remoteapi graph-uuid)))]
  1685. (if (or gone? (empty? salt-value))
  1686. {:restore-pwd-failed "expired salt"}
  1687. (let [pwd (<! (decrypt-content encrypted-pwd salt-value))]
  1688. (if (nil? pwd)
  1689. {:restore-pwd-failed (str "decrypt-pwd failed, salt: " salt-value)}
  1690. (swap! pwd-map assoc-in [graph-uuid :pwd] pwd)))))))))
  1691. (defn- set-keys&notify
  1692. [graph-uuid public-key private-key]
  1693. (swap! pwd-map assoc-in [graph-uuid :public-key] public-key)
  1694. (swap! pwd-map assoc-in [graph-uuid :private-key] private-key)
  1695. (offer! (get-graph-pwd-changed-chan graph-uuid) true))
  1696. (defn- <set-graph-encryption-keys!
  1697. [graph-uuid pwd public-key encrypted-private-key]
  1698. (go
  1699. (let [private-key (when (and pwd encrypted-private-key)
  1700. (<! (decrypt-content encrypted-private-key pwd)))]
  1701. (when (and private-key (string/starts-with? private-key "AGE-SECRET-KEY"))
  1702. (set-keys&notify graph-uuid public-key private-key)))))
  1703. (def <restored-pwd (chan (async/sliding-buffer 1)))
  1704. (def <restored-pwd-pub (async/pub <restored-pwd :graph-uuid))
  1705. (defn- <ensure-pwd-exists!
  1706. "return password or nil when restore pwd from localstorage failed"
  1707. [repo graph-uuid init-graph-keys]
  1708. (go
  1709. (let [{:keys [restore-pwd-failed]} (<! (restore-pwd! graph-uuid))
  1710. pwd (get-in @pwd-map [graph-uuid :pwd])]
  1711. (if restore-pwd-failed
  1712. (do (state/pub-event! [:modal/remote-encryption-input-pw-dialog repo
  1713. (state/get-remote-graph-info-by-uuid graph-uuid)
  1714. :input-pwd-remote
  1715. {:GraphUUID graph-uuid
  1716. :init-graph-keys init-graph-keys
  1717. :after-input-password (fn [pwd]
  1718. (when pwd
  1719. (swap! pwd-map assoc-in [graph-uuid :pwd] pwd)
  1720. (offer! <restored-pwd {:graph-uuid graph-uuid :value true})))}])
  1721. nil)
  1722. pwd))))
  1723. (defn clear-pwd!
  1724. "- clear pwd in `pwd-map`
  1725. - remove encrypted-pwd in local-storage"
  1726. [graph-uuid]
  1727. (swap! pwd-map dissoc graph-uuid)
  1728. (remove-pwd! graph-uuid))
  1729. (defn- <loop-ensure-pwd&keys
  1730. [graph-uuid repo *stopped?]
  1731. (let [<restored-pwd-sub-chan (chan 1)]
  1732. (async/sub <restored-pwd-pub graph-uuid <restored-pwd-sub-chan)
  1733. (go-loop []
  1734. (if @*stopped?
  1735. ::stop
  1736. (let [{:keys [public-key encrypted-private-key] :as r}
  1737. (<! (<get-graph-encrypt-keys-memoize remoteapi graph-uuid))
  1738. init-graph-keys (some-> (ex-data r) :err :status (= 404))
  1739. pwd (<! (<ensure-pwd-exists! repo graph-uuid init-graph-keys))]
  1740. (cond
  1741. (not pwd)
  1742. (do (println :debug "waiting password...")
  1743. (<! <restored-pwd-sub-chan) ;loop to wait password
  1744. (println :debug "waiting password...DONE" graph-uuid)
  1745. (recur))
  1746. init-graph-keys
  1747. ;; when public+private keys not stored at server
  1748. ;; generate a new key pair and upload them
  1749. (let [next-state
  1750. (let [{public-key :publicKey private-key :secretKey}
  1751. (<! (<key-gen rsapi))
  1752. encrypted-private-key (<! (<encrypt-content private-key pwd))
  1753. _ (assert (string? encrypted-private-key)
  1754. {:encrypted-private-key encrypted-private-key
  1755. :private-key private-key
  1756. :pwd pwd})
  1757. upload-r (<! (<upload-graph-encrypt-keys remoteapi graph-uuid public-key encrypted-private-key))]
  1758. (if (instance? ExceptionInfo upload-r)
  1759. (do (js/console.log "upload-graph-encrypt-keys err" upload-r)
  1760. ::stop)
  1761. (do (update-graph-encrypt-keys-cache graph-uuid {:public-key public-key
  1762. :encrypted-private-key encrypted-private-key})
  1763. :recur)))]
  1764. (if (= :recur next-state)
  1765. (recur)
  1766. next-state))
  1767. :else
  1768. ;; pwd, public-key, encrypted-private-key all exist
  1769. (do (assert (and pwd public-key encrypted-private-key) {:encrypted-private-key encrypted-private-key
  1770. :public-key public-key
  1771. :pwd pwd})
  1772. (<! (<set-graph-encryption-keys! graph-uuid pwd public-key encrypted-private-key))
  1773. (if (get-in @pwd-map [graph-uuid :private-key])
  1774. (do (when (state/modal-opened?)
  1775. (state/set-state! [:ui/loading? :set-graph-password] false)
  1776. (state/close-modal!))
  1777. ::idle)
  1778. ;; bad pwd
  1779. (do (when (state/modal-opened?)
  1780. (when (state/sub [:ui/loading? :set-graph-password])
  1781. (state/set-state! [:file-sync/set-remote-graph-password-result]
  1782. {:fail "Incorrect password. Please try again"}))
  1783. (state/set-state! [:ui/loading? :set-graph-password] false))
  1784. (clear-pwd! graph-uuid)
  1785. (recur))))))))))
  1786. (defn- <set-env&keys
  1787. [prod? graph-uuid]
  1788. (let [{:keys [private-key public-key]} (get @pwd-map graph-uuid)]
  1789. (assert (and private-key public-key) (pr-str :private-key private-key :public-key public-key
  1790. :pwd-map @pwd-map))
  1791. (<set-env rsapi graph-uuid prod? private-key public-key)))
  1792. (defn- <ensure-set-env&keys
  1793. [graph-uuid *stopped?]
  1794. (go-loop []
  1795. (let [{:keys [change timeout]}
  1796. (async/alt! (get-graph-pwd-changed-chan graph-uuid) {:change true}
  1797. (timeout 10000) {:timeout true})]
  1798. (cond
  1799. @*stopped? nil
  1800. change (<! (<set-env&keys config/FILE-SYNC-PROD? graph-uuid))
  1801. timeout (recur)))))
  1802. ;;; ### chans to control sync process
  1803. (def full-sync-chan
  1804. "offer `true` to this chan will trigger a local->remote full sync"
  1805. (chan 1))
  1806. (def full-sync-mult (async/mult full-sync-chan))
  1807. (def remote->local-sync-chan
  1808. "offer `true` to this chan will trigger a remote->local sync"
  1809. (chan 1))
  1810. (def remote->local-sync-mult (async/mult remote->local-sync-chan))
  1811. (def remote->local-full-sync-chan
  1812. "offer `true` to this chan will trigger a remote->local full sync"
  1813. (chan 1))
  1814. (def remote->local-full-sync-mult (async/mult remote->local-full-sync-chan))
  1815. (def immediately-local->remote-chan
  1816. "Immediately trigger upload of files in waiting queue"
  1817. (chan))
  1818. (def immediately-local->remote-mult (async/mult immediately-local->remote-chan))
  1819. (def pause-resume-chan
  1820. "false -> pause, true -> resume.
  1821. see also `*resume-state`"
  1822. (chan 1))
  1823. (def pause-resume-mult (async/mult pause-resume-chan))
  1824. (def recent-edited-chan
  1825. "Triggered when there is content editing"
  1826. (chan 1))
  1827. (def recent-edited-mult (async/mult recent-edited-chan))
  1828. (def last-input-time-cursor (rum/cursor state/state :editor/last-input-time))
  1829. (add-watch last-input-time-cursor "sync"
  1830. (fn [_ _ _ _]
  1831. (offer! recent-edited-chan true)))
  1832. ;;; ### sync state
  1833. (def *resume-state
  1834. "key: graph-uuid"
  1835. (atom {}))
  1836. (defn resume-state--add-remote->local-state
  1837. [graph-uuid]
  1838. (swap! *resume-state assoc graph-uuid {:remote->local true}))
  1839. (defn resume-state--add-remote->local-full-sync-state
  1840. [graph-uuid]
  1841. (swap! *resume-state assoc graph-uuid {:remote->local-full-sync true}))
  1842. (defn resume-state--add-local->remote-state
  1843. [graph-uuid local-changes]
  1844. (swap! *resume-state assoc graph-uuid {:local->remote local-changes}))
  1845. ;; (defn resume-state--add-local->remote-full-sync-state
  1846. ;; [graph-uuid]
  1847. ;; (swap! *resume-state assoc graph-uuid {:local->remote-full-sync true}))
  1848. (defn resume-state--reset
  1849. [graph-uuid]
  1850. (swap! *resume-state dissoc graph-uuid))
  1851. (defn sync-state
  1852. "create a new sync-state"
  1853. []
  1854. {:post [(s/valid? ::sync-state %)]}
  1855. {:current-syncing-graph-uuid nil
  1856. :state ::starting
  1857. :full-local->remote-files #{}
  1858. :current-local->remote-files #{}
  1859. :full-remote->local-files #{}
  1860. :current-remote->local-files #{}
  1861. :queued-local->remote-files #{}
  1862. :recent-remote->local-files #{}
  1863. :history '()})
  1864. (defn- sync-state--update-current-syncing-graph-uuid
  1865. [sync-state graph-uuid]
  1866. {:pre [(s/valid? ::sync-state sync-state)]
  1867. :post [(s/valid? ::sync-state %)]}
  1868. (assoc sync-state :current-syncing-graph-uuid graph-uuid))
  1869. (defn- sync-state--update-state
  1870. [sync-state next-state]
  1871. {:pre [(s/valid? ::state next-state)]
  1872. :post [(s/valid? ::sync-state %)]}
  1873. (assoc sync-state :state next-state))
  1874. (defn sync-state--add-current-remote->local-files
  1875. [sync-state paths]
  1876. {:post [(s/valid? ::sync-state %)]}
  1877. (update sync-state :current-remote->local-files into paths))
  1878. (defn sync-state--add-current-local->remote-files
  1879. [sync-state paths]
  1880. {:post [(s/valid? ::sync-state %)]}
  1881. (update sync-state :current-local->remote-files into paths))
  1882. (defn sync-state--add-queued-local->remote-files
  1883. [sync-state event]
  1884. {:post [(s/valid? ::sync-state %)]}
  1885. (update sync-state :queued-local->remote-files
  1886. (fn [o event]
  1887. (->> (concat o [event])
  1888. (util/distinct-by-last-wins (fn [e] (.-path e))))) event))
  1889. (defn sync-state--remove-queued-local->remote-files
  1890. [sync-state event]
  1891. {:post [(s/valid? ::sync-state %)]}
  1892. (update sync-state :queued-local->remote-files
  1893. (fn [o event]
  1894. (remove #{event} o)) event))
  1895. (defn sync-state-reset-queued-local->remote-files
  1896. [sync-state]
  1897. {:post [(s/valid? ::sync-state %)]}
  1898. (assoc sync-state :queued-local->remote-files nil))
  1899. (defn sync-state--add-recent-remote->local-files
  1900. [sync-state items]
  1901. {:pre [(s/valid? (s/coll-of ::recent-remote->local-file-item) items)]
  1902. :post [(s/valid? ::sync-state %)]}
  1903. (update sync-state :recent-remote->local-files (partial apply conj) items))
  1904. (defn sync-state--remove-recent-remote->local-files
  1905. [sync-state items]
  1906. {:post [(s/valid? ::sync-state %)]}
  1907. (update sync-state :recent-remote->local-files set/difference items))
  1908. (defn sync-state-reset-full-local->remote-files
  1909. [sync-state events]
  1910. {:post [(s/valid? ::sync-state %)]}
  1911. (assoc sync-state :full-local->remote-files events))
  1912. (defn sync-state-reset-full-remote->local-files
  1913. [sync-state events]
  1914. {:post [(s/valid? ::sync-state %)]}
  1915. (assoc sync-state :full-remote->local-files events))
  1916. (defn- add-history-items
  1917. [history paths now]
  1918. (sequence
  1919. (comp
  1920. ;; only reserve the latest one of same-path-items
  1921. (dedupe-by :path)
  1922. ;; reserve the latest 20 history items
  1923. (take 20))
  1924. (into (filter (fn [o]
  1925. (not (contains? (set paths) (:path o)))) history)
  1926. (map (fn [path] {:path path :time now}) paths))))
  1927. (defn sync-state--remove-current-remote->local-files
  1928. [sync-state paths add-history?]
  1929. {:post [(s/valid? ::sync-state %)]}
  1930. (let [now (t/now)]
  1931. (cond-> sync-state
  1932. true (update :current-remote->local-files set/difference paths)
  1933. add-history? (update :history add-history-items paths now))))
  1934. (defn sync-state--remove-current-local->remote-files
  1935. [sync-state paths add-history?]
  1936. {:post [(s/valid? ::sync-state %)]}
  1937. (let [now (t/now)]
  1938. (cond-> sync-state
  1939. true (update :current-local->remote-files set/difference paths)
  1940. add-history? (update :history add-history-items paths now))))
  1941. (defn sync-state--stopped?
  1942. "Graph syncing is stopped"
  1943. [sync-state]
  1944. {:pre [(s/valid? ::sync-state sync-state)]}
  1945. (= ::stop (:state sync-state)))
  1946. (defn sync-state--valid-to-accept-filewatcher-event?
  1947. [sync-state]
  1948. {:pre [(s/valid? ::sync-state sync-state)]}
  1949. (contains? #{::idle ::local->remote ::remote->local ::local->remote-full-sync ::remote->local-full-sync}
  1950. (:state sync-state)))
  1951. ;;; ### remote->local syncer & local->remote syncer
  1952. (defprotocol IRemote->LocalSync
  1953. (stop-remote->local! [this])
  1954. (<sync-remote->local! [this] "return ExceptionInfo when error occurs")
  1955. (<sync-remote->local-all-files! [this] "sync all files, return ExceptionInfo when error occurs"))
  1956. (defprotocol ILocal->RemoteSync
  1957. (setup-local->remote! [this])
  1958. (stop-local->remote! [this])
  1959. (<ratelimit [this from-chan] "get watched local file-change events from FROM-CHAN,
  1960. return chan returning events with rate limited")
  1961. (<sync-local->remote! [this es] "es is a sequence of `FileChangeEvent`, all items have same type.")
  1962. (<sync-local->remote-all-files! [this] "compare all local files to remote ones, sync when not equal.
  1963. if local-txid != remote-txid, return {:need-sync-remote true}"))
  1964. (defrecord ^:large-vars/cleanup-todo
  1965. Remote->LocalSyncer [user-uuid graph-uuid base-path repo *txid *txid-for-get-deletion-log *sync-state remoteapi
  1966. ^:mutable local->remote-syncer *stopped *paused]
  1967. Object
  1968. (set-local->remote-syncer! [_ s] (set! local->remote-syncer s))
  1969. (sync-files-remote->local!
  1970. [_ relative-filepath+checksum-coll latest-txid]
  1971. (go
  1972. (let [partitioned-filetxns
  1973. (sequence (filepath+checksum-coll->partitioned-filetxns
  1974. download-batch-size graph-uuid user-uuid)
  1975. relative-filepath+checksum-coll)
  1976. r
  1977. (if (empty? (flatten partitioned-filetxns))
  1978. {:succ true}
  1979. (do
  1980. (put-sync-event! {:event :start
  1981. :data {:type :full-remote->local
  1982. :graph-uuid graph-uuid
  1983. :full-sync? true
  1984. :epoch (tc/to-epoch (t/now))}})
  1985. (<! (apply-filetxns-partitions
  1986. *sync-state user-uuid graph-uuid base-path partitioned-filetxns repo
  1987. nil *stopped *paused true))))]
  1988. (cond
  1989. (instance? ExceptionInfo r) {:unknown r}
  1990. @*stopped {:stop true}
  1991. @*paused {:pause true}
  1992. :else
  1993. (do
  1994. (swap! *sync-state #(sync-state-reset-full-remote->local-files % []))
  1995. (<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo))
  1996. (reset! *txid latest-txid)
  1997. {:succ true})))))
  1998. IRemote->LocalSync
  1999. (stop-remote->local! [_] (vreset! *stopped true))
  2000. (<sync-remote->local! [_]
  2001. (go
  2002. (let [r
  2003. (let [diff-r (<! (<get-diff remoteapi graph-uuid @*txid))]
  2004. (if (instance? ExceptionInfo diff-r)
  2005. diff-r
  2006. (let [[diff-txns latest-txid min-txid] diff-r]
  2007. (if (> (dec min-txid) @*txid) ;; min-txid-1 > @*txid, need to remote->local-full-sync
  2008. (do (println "min-txid" min-txid "request-txid" @*txid)
  2009. {:need-remote->local-full-sync true})
  2010. (when (pos-int? latest-txid)
  2011. (let [filtered-diff-txns (-> (transduce (diffs->filetxns) conj '() (reverse diff-txns))
  2012. filter-download-files-with-reserved-chars)
  2013. partitioned-filetxns (transduce (partition-filetxns download-batch-size)
  2014. (completing (fn [r i] (conj r (reverse i)))) ;reverse
  2015. '()
  2016. filtered-diff-txns)]
  2017. (put-sync-event! {:event :start
  2018. :data {:type :remote->local
  2019. :graph-uuid graph-uuid
  2020. :full-sync? false
  2021. :epoch (tc/to-epoch (t/now))}})
  2022. (if (empty? (flatten partitioned-filetxns))
  2023. (do
  2024. (swap! *sync-state #(sync-state-reset-full-remote->local-files % []))
  2025. (<! (<update-graphs-txid! latest-txid graph-uuid user-uuid repo))
  2026. (reset! *txid latest-txid)
  2027. {:succ true})
  2028. (<! (apply-filetxns-partitions
  2029. *sync-state user-uuid graph-uuid base-path
  2030. partitioned-filetxns repo *txid *stopped *paused false)))))))))]
  2031. (cond
  2032. (instance? ExceptionInfo r) {:unknown r}
  2033. @*stopped {:stop true}
  2034. @*paused {:pause true}
  2035. (:need-remote->local-full-sync r) r
  2036. :else {:succ true}))))
  2037. (<sync-remote->local-all-files! [this]
  2038. (go
  2039. (let [remote-all-files-meta-c (<get-remote-all-files-meta remoteapi graph-uuid)
  2040. local-all-files-meta-c (<get-local-all-files-meta rsapi graph-uuid base-path)
  2041. remote-all-files-meta-or-exp (<! remote-all-files-meta-c)]
  2042. (if (or (storage-exceed-limit? remote-all-files-meta-or-exp)
  2043. (sync-stop-when-api-flying? remote-all-files-meta-or-exp)
  2044. (decrypt-exp? remote-all-files-meta-or-exp))
  2045. (do (put-sync-event! {:event :exception-decrypt-failed
  2046. :data {:graph-uuid graph-uuid
  2047. :exp remote-all-files-meta-or-exp
  2048. :epoch (tc/to-epoch (t/now))}})
  2049. {:stop true})
  2050. (let [remote-all-files-meta remote-all-files-meta-or-exp
  2051. local-all-files-meta (<! local-all-files-meta-c)
  2052. {diff-remote-files :result elapsed-time :time}
  2053. (util/with-time (diff-file-metadata-sets remote-all-files-meta local-all-files-meta))
  2054. _ (println ::diff-file-metadata-sets-elapsed-time elapsed-time "ms")
  2055. recent-10-days-range ((juxt #(tc/to-long (t/minus % (t/days 10))) #(tc/to-long %)) (t/today))
  2056. sorted-diff-remote-files
  2057. (sort-by
  2058. (sort-file-metadata-fn :recent-days-range recent-10-days-range) > diff-remote-files)
  2059. remote-graph-info-or-ex (<! (<get-remote-graph remoteapi nil graph-uuid))
  2060. latest-txid (:TXId remote-graph-info-or-ex)]
  2061. (if (or (instance? ExceptionInfo remote-graph-info-or-ex) (nil? latest-txid))
  2062. (do (put-sync-event! {:event :get-remote-graph-failed
  2063. :data {:graph-uuid graph-uuid
  2064. :exp remote-graph-info-or-ex
  2065. :epoch (tc/to-epoch (t/now))}})
  2066. {:stop true})
  2067. (do (println "[full-sync(remote->local)]" (count sorted-diff-remote-files) "files need to sync")
  2068. (let [filtered-files (filter-download-files-with-reserved-chars sorted-diff-remote-files)]
  2069. (swap! *sync-state #(sync-state-reset-full-remote->local-files % sorted-diff-remote-files))
  2070. (<! (.sync-files-remote->local!
  2071. this (map (juxt relative-path -checksum)
  2072. filtered-files)
  2073. latest-txid)))))))))))
  2074. (defn- <file-changed?
  2075. "return true when file changed compared with remote"
  2076. [graph-uuid file-path-without-base-path base-path]
  2077. {:pre [(string? file-path-without-base-path)]}
  2078. (go
  2079. (let [remote-meta-or-exp (<! (<get-remote-files-meta remoteapi graph-uuid [file-path-without-base-path]))
  2080. local-meta (first (<! (<get-local-files-meta rsapi graph-uuid base-path [file-path-without-base-path])))]
  2081. (if (instance? ExceptionInfo remote-meta-or-exp)
  2082. false
  2083. (not= (first remote-meta-or-exp) local-meta)))))
  2084. (defn- <filter-local-changes-pred
  2085. "filter local-change events:
  2086. - for 'unlink' event
  2087. - when related file exists on local dir, ignore this event
  2088. - for 'add' | 'change' event
  2089. - when related file's content is same as remote file, ignore it"
  2090. [^FileChangeEvent e basepath graph-uuid]
  2091. (go
  2092. (let [r-path (relative-path e)]
  2093. (case (.-type e)
  2094. "unlink"
  2095. ;; keep this e when it's not found
  2096. (<! (<local-file-not-exist? graph-uuid rsapi basepath r-path))
  2097. ("add" "change")
  2098. ;; 1. local file exists
  2099. ;; 2. compare with remote file, and changed
  2100. (and (not (<! (<local-file-not-exist? graph-uuid rsapi basepath r-path)))
  2101. (<! (<file-changed? graph-uuid r-path basepath)))))))
  2102. (defn- <filter-checksum-not-consistent
  2103. "filter out FileChangeEvents checksum changed,
  2104. compare checksum in FileChangeEvent and checksum calculated now"
  2105. [graph-uuid es]
  2106. {:pre [(or (nil? es) (coll? es))
  2107. (every? #(instance? FileChangeEvent %) es)]}
  2108. (go
  2109. (when (seq es)
  2110. (if (= "unlink" (.-type ^FileChangeEvent (first es)))
  2111. es
  2112. (let [base-path (.-dir (first es))
  2113. files-meta (<! (<get-local-files-meta
  2114. rsapi graph-uuid base-path (mapv relative-path es)))
  2115. current-checksum-map (when (coll? files-meta) (into {} (mapv (juxt :path :etag) files-meta)))
  2116. origin-checksum-map (into {} (mapv (juxt relative-path #(.-checksum ^FileChangeEvent %)) es))
  2117. origin-map (into {} (mapv (juxt relative-path identity) es))]
  2118. (->>
  2119. (merge-with
  2120. #(boolean (or (nil? %1) (= "fake-checksum" %1) (= %1 %2)))
  2121. origin-checksum-map current-checksum-map)
  2122. (filterv (comp true? second))
  2123. (mapv first)
  2124. (select-keys origin-map)
  2125. vals))))))
  2126. (def ^:private file-size-limit (* 100 1000 1024)) ;100MB
  2127. (defn- filter-too-huge-files-aux
  2128. [e]
  2129. {:post [(boolean? %)]}
  2130. (if (= "unlink" (.-type ^FileChangeEvent e))
  2131. true
  2132. (boolean
  2133. (when-some [size (:size (.-stat e))]
  2134. (< size file-size-limit)))))
  2135. (defn- filter-too-huge-files
  2136. "filter out files > `file-size-limit`"
  2137. [es]
  2138. {:pre [(or (nil? es) (coll? es))
  2139. (every? #(instance? FileChangeEvent %) es)]}
  2140. (filterv filter-too-huge-files-aux es))
  2141. (defn- filter-local-files-in-deletion-logs
  2142. [local-all-files-meta deletion-logs remote-all-files-meta]
  2143. (let [deletion-logs-map (into {}
  2144. (mapcat
  2145. (fn [log]
  2146. (mapv
  2147. (fn [path] [path (select-keys log [:epoch :TXId])])
  2148. (:paths log))))
  2149. deletion-logs)
  2150. remote-all-files-meta-map (into {} (map (juxt :path identity)) remote-all-files-meta)
  2151. *keep (transient #{})
  2152. *delete (transient #{})
  2153. filtered-deletion-logs-map
  2154. (loop [[deletion-log & others] deletion-logs-map
  2155. result {}]
  2156. (if-not deletion-log
  2157. result
  2158. (let [[deletion-log-path deletion-log-meta] deletion-log
  2159. meta (get remote-all-files-meta-map deletion-log-path)
  2160. meta-txid (:txid meta)
  2161. deletion-txid (:TXId deletion-log-meta)]
  2162. (if (and meta-txid deletion-txid
  2163. (> meta-txid deletion-txid))
  2164. (recur others result)
  2165. (recur others (into result [[deletion-log-path deletion-log-meta]]))))))]
  2166. (doseq [f local-all-files-meta]
  2167. (let [epoch-long (some-> (get filtered-deletion-logs-map (:path f))
  2168. :epoch
  2169. (* 1000))]
  2170. (if (and epoch-long (> epoch-long (:last-modified f)))
  2171. (conj! *delete f)
  2172. (conj! *keep f))))
  2173. {:keep (persistent! *keep)
  2174. :delete (persistent! *delete)}))
  2175. (defn- <filter-too-long-filename
  2176. [graph-uuid local-files-meta]
  2177. (go (let [origin-fnames (mapv :path local-files-meta)
  2178. encrypted-fnames (<! (<encrypt-fnames rsapi graph-uuid origin-fnames))
  2179. fnames-map (zipmap origin-fnames encrypted-fnames)
  2180. local-files-meta-map (into {} (map (fn [meta] [(:path meta) meta])) local-files-meta)]
  2181. (sequence
  2182. (comp
  2183. (filter
  2184. (fn [[path _]]
  2185. ; 950 = (- 1024 36 36 2)
  2186. ; 1024 - length of 'user-uuid/graph-uuid/'
  2187. (<= (count (get fnames-map path)) 950)))
  2188. (map second))
  2189. local-files-meta-map))))
  2190. (defrecord ^:large-vars/cleanup-todo
  2191. Local->RemoteSyncer [user-uuid graph-uuid base-path repo *sync-state remoteapi
  2192. ^:mutable rate *txid *txid-for-get-deletion-log
  2193. ^:mutable remote->local-syncer stop-chan *stopped *paused
  2194. ;; control chans
  2195. private-immediately-local->remote-chan private-recent-edited-chan]
  2196. Object
  2197. (filter-file-change-events-fn [_]
  2198. (fn [^FileChangeEvent e]
  2199. (go (and (instance? FileChangeEvent e)
  2200. (if-let [mtime (:mtime (.-stat e))]
  2201. ;; if mtime is not nil, it should be after (- now 1min)
  2202. ;; ignore events too early
  2203. (> (* 1000 mtime) (tc/to-long (t/minus (t/now) (t/minutes 1))))
  2204. true)
  2205. (or (string/starts-with? (.-dir e) base-path)
  2206. (string/starts-with? (str "file://" (.-dir e)) base-path)) ; valid path prefix
  2207. (not (ignored? e)) ;not ignored
  2208. ;; download files will also trigger file-change-events, ignore them
  2209. (if (= "unlink" (:type e))
  2210. true
  2211. (when-some [recent-remote->local-file-item
  2212. (<! (<file-change-event=>recent-remote->local-file-item
  2213. graph-uuid e))]
  2214. (not (contains? (:recent-remote->local-files @*sync-state) recent-remote->local-file-item))))))))
  2215. (set-remote->local-syncer! [_ s] (set! remote->local-syncer s))
  2216. ILocal->RemoteSync
  2217. (setup-local->remote! [_]
  2218. (async/tap immediately-local->remote-mult private-immediately-local->remote-chan)
  2219. (async/tap recent-edited-mult private-recent-edited-chan))
  2220. (stop-local->remote! [_]
  2221. (async/untap immediately-local->remote-mult private-immediately-local->remote-chan)
  2222. (async/untap recent-edited-mult private-recent-edited-chan)
  2223. (async/close! stop-chan)
  2224. (vreset! *stopped true))
  2225. (<ratelimit [this from-chan]
  2226. (let [<fast-filter-e-fn (.filter-file-change-events-fn this)]
  2227. (util/<ratelimit
  2228. from-chan rate
  2229. :filter-fn
  2230. (fn [e]
  2231. (go
  2232. (and (rsapi-ready? rsapi graph-uuid)
  2233. (<! (<fast-filter-e-fn e))
  2234. (do
  2235. (swap! *sync-state sync-state--add-queued-local->remote-files e)
  2236. (let [v (<! (<filter-local-changes-pred e base-path graph-uuid))]
  2237. (when-not v
  2238. (swap! *sync-state sync-state--remove-queued-local->remote-files e))
  2239. v)))))
  2240. :flush-fn #(swap! *sync-state sync-state-reset-queued-local->remote-files)
  2241. :stop-ch stop-chan
  2242. :distinct-coll? true
  2243. :flush-now-ch private-immediately-local->remote-chan
  2244. :refresh-timeout-ch private-recent-edited-chan)))
  2245. (<sync-local->remote! [_ es]
  2246. (if (empty? es)
  2247. (go {:succ true})
  2248. (let [type (.-type ^FileChangeEvent (first es))
  2249. es->paths-xf (comp
  2250. (map #(relative-path %))
  2251. (remove ignored?))]
  2252. (go
  2253. (let [es* (<! (<filter-checksum-not-consistent graph-uuid es))
  2254. _ (when (not= (count es*) (count es))
  2255. (println :debug :filter-checksum-changed
  2256. (mapv relative-path (set/difference (set es) (set es*)))))
  2257. es** (filter-too-huge-files es*)
  2258. _ (when (not= (count es**) (count es*))
  2259. (println :debug :filter-too-huge-files
  2260. (mapv relative-path (set/difference (set es*) (set es**)))))
  2261. paths (cond-> (sequence es->paths-xf es**)
  2262. (not= type "unlink")
  2263. filter-upload-files-with-reserved-chars)
  2264. _ (println :sync-local->remote type paths)
  2265. r (if (empty? paths)
  2266. (go @*txid)
  2267. (case type
  2268. ("add" "change")
  2269. (<with-pause (<update-remote-files rsapi graph-uuid base-path paths @*txid) *paused)
  2270. "unlink"
  2271. (<with-pause (<delete-remote-files rsapi graph-uuid base-path paths @*txid) *paused)))
  2272. _ (swap! *sync-state sync-state--add-current-local->remote-files paths)
  2273. r* (<! r)
  2274. [succ? paused?] ((juxt number? :pause) r*)
  2275. _ (swap! *sync-state sync-state--remove-current-local->remote-files paths succ?)]
  2276. (cond
  2277. (need-sync-remote? r*)
  2278. (do (println :need-sync-remote r*)
  2279. {:need-sync-remote true})
  2280. (need-reset-local-txid? r*) ;; TODO: this cond shouldn't be true,
  2281. ;; but some potential bugs cause local-txid > remote-txid
  2282. (let [remote-graph-info-or-ex (<! (<get-remote-graph remoteapi nil graph-uuid))
  2283. remote-txid (:TXId remote-graph-info-or-ex)]
  2284. (if (or (instance? ExceptionInfo remote-graph-info-or-ex) (nil? remote-txid))
  2285. (do (put-sync-event! {:event :get-remote-graph-failed
  2286. :data {:graph-uuid graph-uuid
  2287. :exp remote-graph-info-or-ex
  2288. :epoch (tc/to-epoch (t/now))}})
  2289. {:stop true})
  2290. (do (<! (<update-graphs-txid! remote-txid graph-uuid user-uuid repo))
  2291. (reset! *txid remote-txid)
  2292. {:succ true})))
  2293. (graph-has-been-deleted? r*)
  2294. (do (println :graph-has-been-deleted r*)
  2295. {:graph-has-been-deleted true})
  2296. paused?
  2297. {:pause true}
  2298. succ? ; succ
  2299. (do
  2300. (println "sync-local->remote! update txid" r*)
  2301. ;; persist txid
  2302. (<! (<update-graphs-txid! r* graph-uuid user-uuid repo))
  2303. (reset! *txid r*)
  2304. {:succ true})
  2305. :else
  2306. (do
  2307. (println "sync-local->remote unknown:" r*)
  2308. {:unknown r*})))))))
  2309. (<sync-local->remote-all-files! [this]
  2310. (go
  2311. (let [remote-all-files-meta-c (<get-remote-all-files-meta remoteapi graph-uuid)
  2312. local-all-files-meta-c (<get-local-all-files-meta rsapi graph-uuid base-path)
  2313. deletion-logs-c (<get-deletion-logs remoteapi graph-uuid @*txid-for-get-deletion-log)
  2314. remote-all-files-meta-or-exp (<! remote-all-files-meta-c)
  2315. deletion-logs-or-exp (<! deletion-logs-c)]
  2316. (cond
  2317. (or (storage-exceed-limit? remote-all-files-meta-or-exp)
  2318. (sync-stop-when-api-flying? remote-all-files-meta-or-exp)
  2319. (decrypt-exp? remote-all-files-meta-or-exp))
  2320. (do (put-sync-event! {:event :get-remote-all-files-failed
  2321. :data {:graph-uuid graph-uuid
  2322. :exp remote-all-files-meta-or-exp
  2323. :epoch (tc/to-epoch (t/now))}})
  2324. {:stop true})
  2325. (instance? ExceptionInfo deletion-logs-or-exp)
  2326. (do (put-sync-event! {:event :get-deletion-logs-failed
  2327. :data {:graph-uuid graph-uuid
  2328. :exp deletion-logs-or-exp
  2329. :epoch (tc/to-epoch (t/now))}})
  2330. {:stop true})
  2331. :else
  2332. (let [remote-all-files-meta remote-all-files-meta-or-exp
  2333. local-all-files-meta (<! local-all-files-meta-c)
  2334. {local-all-files-meta :keep delete-local-files :delete}
  2335. (filter-local-files-in-deletion-logs local-all-files-meta deletion-logs-or-exp remote-all-files-meta)
  2336. recent-10-days-range ((juxt #(tc/to-long (t/minus % (t/days 10))) #(tc/to-long %)) (t/today))
  2337. diff-local-files (->> (diff-file-metadata-sets local-all-files-meta remote-all-files-meta)
  2338. (<filter-too-long-filename graph-uuid)
  2339. <!
  2340. (sort-by (sort-file-metadata-fn :recent-days-range recent-10-days-range) >))
  2341. change-events
  2342. (sequence
  2343. (comp
  2344. ;; convert to FileChangeEvent
  2345. (map #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %)
  2346. {:size (:size %)} (:etag %)))
  2347. (remove ignored?))
  2348. diff-local-files)
  2349. distinct-change-events (-> (distinct-file-change-events change-events)
  2350. filter-upload-files-with-reserved-chars)
  2351. _ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distinct-change-events))
  2352. change-events-partitions
  2353. (sequence
  2354. ;; partition FileChangeEvents
  2355. (partition-file-change-events upload-batch-size)
  2356. distinct-change-events)]
  2357. (println "[full-sync(local->remote)]"
  2358. (count (flatten change-events-partitions)) "files need to sync and"
  2359. (count delete-local-files) "local files need to delete")
  2360. (put-sync-event! {:event :start
  2361. :data {:type :full-local->remote
  2362. :graph-uuid graph-uuid
  2363. :full-sync? true
  2364. :epoch (tc/to-epoch (t/now))}})
  2365. ;; 1. delete local files
  2366. (loop [[f & fs] delete-local-files]
  2367. (when f
  2368. (let [relative-p (relative-path f)]
  2369. (when-not (<! (<local-file-not-exist? graph-uuid rsapi base-path relative-p))
  2370. (let [fake-recent-remote->local-file-item {:remote->local-type :delete
  2371. :checksum nil
  2372. :path relative-p}]
  2373. (swap! *sync-state sync-state--add-recent-remote->local-files
  2374. [fake-recent-remote->local-file-item])
  2375. (<! (<delete-local-files rsapi graph-uuid base-path [(relative-path f)]))
  2376. (go (<! (timeout 5000))
  2377. (swap! *sync-state sync-state--remove-recent-remote->local-files
  2378. [fake-recent-remote->local-file-item])))))
  2379. (recur fs)))
  2380. ;; 2. upload local files
  2381. (let [r (loop [es-partitions change-events-partitions]
  2382. (if @*stopped
  2383. {:stop true}
  2384. (if (empty? es-partitions)
  2385. {:succ true}
  2386. (let [{:keys [succ need-sync-remote graph-has-been-deleted unknown stop] :as r}
  2387. (<! (<sync-local->remote! this (first es-partitions)))]
  2388. (s/assert ::sync-local->remote!-result r)
  2389. (cond
  2390. succ
  2391. (recur (next es-partitions))
  2392. (or need-sync-remote graph-has-been-deleted unknown stop) r)))))]
  2393. ;; update *txid-for-get-deletion-log
  2394. (reset! *txid-for-get-deletion-log @*txid)
  2395. r
  2396. )))))))
  2397. ;;; ### put all stuff together
  2398. (defrecord ^:large-vars/cleanup-todo
  2399. SyncManager [user-uuid graph-uuid base-path *sync-state
  2400. ^Local->RemoteSyncer local->remote-syncer ^Remote->LocalSyncer remote->local-syncer remoteapi
  2401. ^:mutable ratelimit-local-changes-chan
  2402. *txid *txid-for-get-deletion-log
  2403. ^:mutable state ^:mutable remote-change-chan ^:mutable *ws *stopped? *paused?
  2404. ^:mutable ops-chan ^:mutable app-awake-from-sleep-chan
  2405. ;; control chans
  2406. private-full-sync-chan private-remote->local-sync-chan
  2407. private-remote->local-full-sync-chan private-pause-resume-chan]
  2408. Object
  2409. (schedule [this next-state args reason]
  2410. {:pre [(s/valid? ::state next-state)]}
  2411. (println "[SyncManager" graph-uuid "]"
  2412. (and state (name state)) "->" (and next-state (name next-state)) :reason reason :local-txid @*txid :now (tc/to-string (t/now)))
  2413. (set! state next-state)
  2414. (swap! *sync-state sync-state--update-state next-state)
  2415. (go
  2416. (case state
  2417. ::need-password
  2418. (<! (.need-password this))
  2419. ::idle
  2420. (<! (.idle this))
  2421. ::local->remote
  2422. (<! (.local->remote this args))
  2423. ::remote->local
  2424. (<! (.remote->local this nil args))
  2425. ::local->remote-full-sync
  2426. (<! (.full-sync this))
  2427. ::remote->local-full-sync
  2428. (<! (.remote->local-full-sync this args))
  2429. ::pause
  2430. (<! (.pause this))
  2431. ::stop
  2432. (-stop! this))))
  2433. (start [this]
  2434. (set! ops-chan (chan (async/dropping-buffer 10)))
  2435. (set! app-awake-from-sleep-chan (chan (async/sliding-buffer 1)))
  2436. (set! *ws (atom nil))
  2437. (set! remote-change-chan (ws-listen! graph-uuid *ws))
  2438. (set! ratelimit-local-changes-chan (<ratelimit local->remote-syncer local-changes-revised-chan))
  2439. (setup-local->remote! local->remote-syncer)
  2440. (async/tap full-sync-mult private-full-sync-chan)
  2441. (async/tap remote->local-sync-mult private-remote->local-sync-chan)
  2442. (async/tap remote->local-full-sync-mult private-remote->local-full-sync-chan)
  2443. (async/tap pause-resume-mult private-pause-resume-chan)
  2444. (async/tap pubsub/app-wake-up-from-sleep-mult app-awake-from-sleep-chan)
  2445. (go-loop []
  2446. (let [{:keys [remote->local remote->local-full-sync local->remote-full-sync local->remote resume pause stop]}
  2447. (async/alt!
  2448. private-remote->local-full-sync-chan {:remote->local-full-sync true}
  2449. private-remote->local-sync-chan {:remote->local true}
  2450. private-full-sync-chan {:local->remote-full-sync true}
  2451. private-pause-resume-chan ([v] (if v {:resume true} {:pause true}))
  2452. remote-change-chan ([v] (println "remote change:" v) {:remote->local v})
  2453. ratelimit-local-changes-chan ([v]
  2454. (if (nil? v)
  2455. {:stop true}
  2456. (let [rest-v (util/drain-chan ratelimit-local-changes-chan)
  2457. vs (cons v rest-v)]
  2458. (println "local changes:" vs)
  2459. {:local->remote vs})))
  2460. app-awake-from-sleep-chan {:remote->local true}
  2461. (timeout (* 20 60 1000)) {:local->remote-full-sync true}
  2462. (timeout (* 10 60 1000)) {:remote->local true}
  2463. :priority true)]
  2464. (cond
  2465. stop
  2466. nil
  2467. remote->local-full-sync
  2468. (do (util/drain-chan ops-chan)
  2469. (>! ops-chan {:remote->local-full-sync true})
  2470. (recur))
  2471. remote->local
  2472. (let [txid
  2473. (if (true? remote->local)
  2474. {:txid (:TXId (<! (<get-remote-graph remoteapi nil graph-uuid)))}
  2475. remote->local)]
  2476. (when (some? txid)
  2477. (>! ops-chan {:remote->local txid}))
  2478. (recur))
  2479. local->remote
  2480. (do (>! ops-chan {:local->remote local->remote})
  2481. (recur))
  2482. local->remote-full-sync
  2483. (do (util/drain-chan ops-chan)
  2484. (>! ops-chan {:local->remote-full-sync true})
  2485. (recur))
  2486. resume
  2487. (do (>! ops-chan {:resume true})
  2488. (recur))
  2489. pause
  2490. (do (vreset! *paused? true)
  2491. (>! ops-chan {:pause true})
  2492. (recur)))))
  2493. (.schedule this ::need-password nil nil))
  2494. (need-password
  2495. [this]
  2496. (go
  2497. (let [next-state (<! (<loop-ensure-pwd&keys graph-uuid (state/get-current-repo) *stopped?))]
  2498. (assert (s/valid? ::state next-state) next-state)
  2499. (when (= next-state ::idle)
  2500. (<! (<ensure-set-env&keys graph-uuid *stopped?)))
  2501. (if @*stopped?
  2502. (.schedule this ::stop nil nil)
  2503. (.schedule this next-state nil nil)))))
  2504. (pause [this]
  2505. (go (<! (<rsapi-cancel-all-requests)))
  2506. (put-sync-event! {:event :pause
  2507. :data {:graph-uuid graph-uuid
  2508. :epoch (tc/to-epoch (t/now))}})
  2509. (go-loop []
  2510. (let [{:keys [resume] :as result} (<! ops-chan)]
  2511. (cond
  2512. resume
  2513. (let [{:keys [remote->local remote->local-full-sync local->remote local->remote-full-sync] :as resume-state}
  2514. (get @*resume-state graph-uuid)]
  2515. (resume-state--reset graph-uuid)
  2516. (vreset! *paused? false)
  2517. (cond
  2518. remote->local
  2519. (offer! private-remote->local-sync-chan true)
  2520. remote->local-full-sync
  2521. (offer! private-remote->local-full-sync-chan true)
  2522. local->remote
  2523. (>! ops-chan {:local->remote local->remote})
  2524. local->remote-full-sync
  2525. (offer! private-full-sync-chan true)
  2526. :else
  2527. ;; if resume-state = nil, try a remote->local to sync recent diffs
  2528. (offer! private-remote->local-sync-chan true))
  2529. (put-sync-event! {:event :resume
  2530. :data {:graph-uuid graph-uuid
  2531. :resume-state resume-state
  2532. :epoch (tc/to-epoch (t/now))}})
  2533. (<! (.schedule this ::idle nil :resume)))
  2534. (nil? result)
  2535. (<! (.schedule this ::stop nil nil))
  2536. :else
  2537. (recur)))))
  2538. (idle [this]
  2539. (go
  2540. (let [{:keys [stop remote->local local->remote local->remote-full-sync remote->local-full-sync pause resume] :as result}
  2541. (<! ops-chan)]
  2542. (cond
  2543. (or stop (nil? result))
  2544. (<! (.schedule this ::stop nil nil))
  2545. remote->local
  2546. (<! (.schedule this ::remote->local {:remote remote->local} {:remote-changed remote->local}))
  2547. local->remote
  2548. (<! (.schedule this ::local->remote {:local local->remote} {:local-changed local->remote}))
  2549. local->remote-full-sync
  2550. (<! (.schedule this ::local->remote-full-sync nil nil))
  2551. remote->local-full-sync
  2552. (<! (.schedule this ::remote->local-full-sync nil nil))
  2553. pause
  2554. (<! (.schedule this ::pause nil nil))
  2555. resume
  2556. (<! (.schedule this ::idle nil nil))
  2557. :else
  2558. (do
  2559. (state/pub-event! [:capture-error {:error (js/Error. "sync/wrong-ops-chan-when-idle")
  2560. :payload {:type :sync/wrong-ops-chan-when-idle
  2561. :ops-chan-result result
  2562. :user-id user-uuid
  2563. :graph-id graph-uuid}}])
  2564. (<! (.schedule this ::idle nil nil)))))))
  2565. (full-sync [this]
  2566. (go
  2567. (let [{:keys [succ need-sync-remote graph-has-been-deleted unknown stop] :as r}
  2568. (<! (<sync-local->remote-all-files! local->remote-syncer))]
  2569. (s/assert ::sync-local->remote-all-files!-result r)
  2570. (cond
  2571. succ
  2572. (do
  2573. (swap! *sync-state #(sync-state-reset-full-local->remote-files % []))
  2574. (put-sync-event! {:event :finished-local->remote
  2575. :data {:graph-uuid graph-uuid
  2576. :full-sync? true
  2577. :epoch (tc/to-epoch (t/now))}})
  2578. (.schedule this ::idle nil nil))
  2579. need-sync-remote
  2580. (do (util/drain-chan ops-chan)
  2581. (>! ops-chan {:remote->local true})
  2582. (>! ops-chan {:local->remote-full-sync true})
  2583. (.schedule this ::idle nil nil))
  2584. graph-has-been-deleted
  2585. (.schedule this ::stop nil :graph-has-been-deleted)
  2586. stop
  2587. (.schedule this ::stop nil nil)
  2588. unknown
  2589. (do
  2590. (state/pub-event! [:capture-error {:error unknown
  2591. :payload {:type :sync/unknown
  2592. :event :local->remote-full-sync-failed
  2593. :user-id user-uuid
  2594. :graph-uuid graph-uuid}}])
  2595. (put-sync-event! {:event :local->remote-full-sync-failed
  2596. :data {:graph-uuid graph-uuid
  2597. :epoch (tc/to-epoch (t/now))}})
  2598. (.schedule this ::idle nil nil))))))
  2599. (remote->local-full-sync [this _]
  2600. (go
  2601. (let [{:keys [succ unknown stop pause]}
  2602. (<! (<sync-remote->local-all-files! remote->local-syncer))]
  2603. (cond
  2604. succ
  2605. (do (put-sync-event! {:event :finished-remote->local
  2606. :data {:graph-uuid graph-uuid
  2607. :full-sync? true
  2608. :epoch (tc/to-epoch (t/now))}})
  2609. (.schedule this ::idle nil nil))
  2610. stop
  2611. (.schedule this ::stop nil nil)
  2612. pause
  2613. (do (resume-state--add-remote->local-full-sync-state graph-uuid)
  2614. (.schedule this ::pause nil nil))
  2615. unknown
  2616. (do
  2617. (state/pub-event! [:capture-error {:error unknown
  2618. :payload {:event :remote->local-full-sync-failed
  2619. :type :sync/unknown
  2620. :graph-uuid graph-uuid
  2621. :user-id user-uuid}}])
  2622. (put-sync-event! {:event :remote->local-full-sync-failed
  2623. :data {:graph-uuid graph-uuid
  2624. :exp unknown
  2625. :epoch (tc/to-epoch (t/now))}})
  2626. (let [next-state (if (string/includes? (str (ex-cause unknown)) "404 Not Found")
  2627. ;; TODO: this should never happen
  2628. ::stop
  2629. ;; if any other exception occurred, re-exec remote->local-full-sync
  2630. ::remote->local-full-sync)]
  2631. (.schedule this next-state nil nil)))))))
  2632. (remote->local [this _next-state {remote-val :remote}]
  2633. (go
  2634. (if (some-> remote-val :txid (<= @*txid))
  2635. (.schedule this ::idle nil nil)
  2636. (let [origin-txid @*txid
  2637. {:keys [succ unknown stop pause need-remote->local-full-sync] :as r}
  2638. (<! (<sync-remote->local! remote->local-syncer))]
  2639. (s/assert ::sync-remote->local!-result r)
  2640. (cond
  2641. need-remote->local-full-sync
  2642. (do (util/drain-chan ops-chan)
  2643. (>! ops-chan {:remote->local-full-sync true})
  2644. (>! ops-chan {:local->remote-full-sync true})
  2645. (.schedule this ::idle nil nil))
  2646. succ
  2647. (do (put-sync-event! {:event :finished-remote->local
  2648. :data {:graph-uuid graph-uuid
  2649. :full-sync? false
  2650. :from-txid origin-txid
  2651. :to-txid @*txid
  2652. :epoch (tc/to-epoch (t/now))}})
  2653. (.schedule this ::idle nil nil))
  2654. stop
  2655. (.schedule this ::stop nil nil)
  2656. pause
  2657. (do (resume-state--add-remote->local-state graph-uuid)
  2658. (.schedule this ::pause nil nil))
  2659. unknown
  2660. (do (prn "remote->local err" unknown)
  2661. (state/pub-event! [:capture-error {:error unknown
  2662. :payload {:type :sync/unknown
  2663. :event :remote->local
  2664. :user-id user-uuid
  2665. :graph-uuid graph-uuid}}])
  2666. (.schedule this ::idle nil nil)))))))
  2667. (local->remote [this {local-changes :local}]
  2668. ;; local-changes:: list of FileChangeEvent
  2669. (assert (some? local-changes) local-changes)
  2670. (go
  2671. (let [distincted-local-changes (distinct-file-change-events local-changes)
  2672. _ (swap! *sync-state #(sync-state-reset-full-local->remote-files % distincted-local-changes))
  2673. change-events-partitions
  2674. (sequence (partition-file-change-events upload-batch-size) distincted-local-changes)
  2675. _ (put-sync-event! {:event :start
  2676. :data {:type :local->remote
  2677. :graph-uuid graph-uuid
  2678. :full-sync? false
  2679. :epoch (tc/to-epoch (t/now))}})
  2680. {:keys [succ need-sync-remote graph-has-been-deleted unknown stop pause]}
  2681. (loop [es-partitions change-events-partitions]
  2682. (cond
  2683. @*stopped? {:stop true}
  2684. @*paused? {:pause true}
  2685. (empty? es-partitions) {:succ true}
  2686. :else
  2687. (let [{:keys [succ need-sync-remote graph-has-been-deleted pause unknown stop] :as r}
  2688. (<! (<sync-local->remote! local->remote-syncer (first es-partitions)))]
  2689. (s/assert ::sync-local->remote!-result r)
  2690. (cond
  2691. succ
  2692. (recur (next es-partitions))
  2693. (or need-sync-remote graph-has-been-deleted unknown pause stop) r))))]
  2694. (cond
  2695. succ
  2696. (do
  2697. (swap! *sync-state #(sync-state-reset-full-local->remote-files % []))
  2698. (put-sync-event! {:event :finished-local->remote
  2699. :data {:graph-uuid graph-uuid
  2700. :full-sync? false
  2701. :file-change-events distincted-local-changes
  2702. :epoch (tc/to-epoch (t/now))}})
  2703. (.schedule this ::idle nil nil))
  2704. need-sync-remote
  2705. (do (util/drain-chan ops-chan)
  2706. (>! ops-chan {:remote->local true})
  2707. (>! ops-chan {:local->remote local-changes})
  2708. (.schedule this ::idle nil nil))
  2709. graph-has-been-deleted
  2710. (.schedule this ::stop nil :graph-has-been-deleted)
  2711. stop
  2712. (.schedule this ::stop nil nil)
  2713. pause
  2714. (do (resume-state--add-local->remote-state graph-uuid local-changes)
  2715. (.schedule this ::pause nil nil))
  2716. unknown
  2717. (do
  2718. (debug/pprint "local->remote" unknown)
  2719. (state/pub-event! [:capture-error {:error unknown
  2720. :payload {:event :local->remote
  2721. :type :sync/unknown
  2722. :user-id user-uuid
  2723. :graph-uuid graph-uuid}}])
  2724. (.schedule this ::idle nil nil))))))
  2725. IStoppable
  2726. (-stop! [_]
  2727. (go
  2728. (when-not @*stopped?
  2729. (vreset! *stopped? true)
  2730. (ws-stop! *ws)
  2731. (async/untap full-sync-mult private-full-sync-chan)
  2732. (async/untap remote->local-sync-mult private-remote->local-sync-chan)
  2733. (async/untap remote->local-full-sync-mult private-remote->local-full-sync-chan)
  2734. (async/untap pause-resume-mult private-pause-resume-chan)
  2735. (async/untap pubsub/app-wake-up-from-sleep-mult app-awake-from-sleep-chan)
  2736. (when ops-chan (async/close! ops-chan))
  2737. (stop-local->remote! local->remote-syncer)
  2738. (stop-remote->local! remote->local-syncer)
  2739. (<! (<rsapi-cancel-all-requests))
  2740. (swap! *sync-state sync-state--update-state ::stop)
  2741. (reset! current-sm-graph-uuid nil)
  2742. (debug/pprint ["stop sync-manager, graph-uuid" graph-uuid "base-path" base-path]))))
  2743. IStopped?
  2744. (-stopped? [_]
  2745. @*stopped?))
  2746. (defn sync-manager [user-uuid graph-uuid base-path repo txid *sync-state]
  2747. (let [*txid (atom txid)
  2748. *txid-for-get-deletion-log (atom txid)
  2749. *stopped? (volatile! false)
  2750. *paused? (volatile! false)
  2751. remoteapi-with-stop (->RemoteAPI *stopped?)
  2752. local->remote-syncer (->Local->RemoteSyncer user-uuid graph-uuid
  2753. base-path
  2754. repo *sync-state remoteapi-with-stop
  2755. (if (mobile-util/native-platform?)
  2756. 2000
  2757. 10000)
  2758. *txid *txid-for-get-deletion-log nil (chan) *stopped? *paused?
  2759. (chan 1) (chan 1))
  2760. remote->local-syncer (->Remote->LocalSyncer user-uuid graph-uuid base-path
  2761. repo *txid *txid-for-get-deletion-log *sync-state remoteapi-with-stop
  2762. nil *stopped? *paused?)]
  2763. (.set-remote->local-syncer! local->remote-syncer remote->local-syncer)
  2764. (.set-local->remote-syncer! remote->local-syncer local->remote-syncer)
  2765. (swap! *sync-state sync-state--update-current-syncing-graph-uuid graph-uuid)
  2766. (->SyncManager user-uuid graph-uuid base-path *sync-state local->remote-syncer remote->local-syncer remoteapi-with-stop
  2767. nil *txid *txid-for-get-deletion-log nil nil nil *stopped? *paused? nil nil (chan 1) (chan 1) (chan 1) (chan 1))))
  2768. (defn sync-manager-singleton
  2769. [user-uuid graph-uuid base-path repo txid *sync-state]
  2770. (when-not @current-sm-graph-uuid
  2771. (reset! current-sm-graph-uuid graph-uuid)
  2772. (sync-manager user-uuid graph-uuid base-path repo txid *sync-state)))
  2773. ;; Avoid sync reentrancy
  2774. (defonce *sync-entered? (atom false))
  2775. (defn <sync-stop []
  2776. (go
  2777. (when-let [sm ^SyncManager (state/get-file-sync-manager (state/get-current-file-sync-graph-uuid))]
  2778. (println "[SyncManager" (:graph-uuid sm) "]" "stopping")
  2779. (state/clear-file-sync-state! (:graph-uuid sm))
  2780. (<! (-stop! sm))
  2781. (reset! *sync-entered? false)
  2782. (println "[SyncManager" (:graph-uuid sm) "]" "stopped"))
  2783. (reset! current-sm-graph-uuid nil)))
  2784. (defn <sync-local->remote-now []
  2785. (go
  2786. (when-let [_sm ^SyncManager (state/get-file-sync-manager (state/get-current-file-sync-graph-uuid))]
  2787. (offer! immediately-local->remote-chan true))))
  2788. (defn sync-need-password!
  2789. []
  2790. (when-let [sm ^SyncManager (state/get-file-sync-manager (state/get-current-file-sync-graph-uuid))]
  2791. (.need-password sm)))
  2792. (defn check-graph-belong-to-current-user
  2793. [current-user-uuid graph-user-uuid]
  2794. (cond
  2795. (nil? current-user-uuid)
  2796. false
  2797. (= current-user-uuid graph-user-uuid)
  2798. true
  2799. :else
  2800. (do (notification/show! (t :file-sync/other-user-graph) :warning false)
  2801. false)))
  2802. (defn <check-remote-graph-exists
  2803. [local-graph-uuid]
  2804. {:pre [(util/uuid-string? local-graph-uuid)]}
  2805. (go
  2806. (let [r (<! (<list-remote-graphs remoteapi))
  2807. result
  2808. (or
  2809. ;; if api call failed, assume this remote graph still exists
  2810. (instance? ExceptionInfo r)
  2811. (and
  2812. (contains? r :Graphs)
  2813. (->> (:Graphs r)
  2814. (mapv :GraphUUID)
  2815. set
  2816. (#(contains? % local-graph-uuid)))))]
  2817. (when-not result
  2818. (notification/show! (t :file-sync/graph-deleted) :warning false))
  2819. result)))
  2820. (defn sync-off?
  2821. [sync-state]
  2822. (or (nil? sync-state) (sync-state--stopped? sync-state)))
  2823. (defn graph-sync-off?
  2824. "Is sync not running for this `graph-uuid`?"
  2825. [graph-uuid]
  2826. (sync-off? (state/get-file-sync-state graph-uuid)))
  2827. (defn graph-encrypted?
  2828. []
  2829. (when-let [graph-uuid (second @graphs-txid)]
  2830. (get-pwd graph-uuid)))
  2831. (declare network-online-cursor)
  2832. (defn <sync-start
  2833. []
  2834. (when-not (false? (state/enable-sync?))
  2835. (go
  2836. (when (false? @*sync-entered?)
  2837. (reset! *sync-entered? true)
  2838. (let [*sync-state (atom (sync-state))
  2839. current-user-uuid (<! (user/<user-uuid))
  2840. ;; put @graph-uuid & get-current-repo together,
  2841. ;; prevent to get older repo dir and current graph-uuid.
  2842. _ (<! (p->c (persist-var/-load graphs-txid)))
  2843. [user-uuid graph-uuid txid] @graphs-txid
  2844. txid (or txid 0)
  2845. repo (state/get-current-repo)]
  2846. (when-not (instance? ExceptionInfo current-user-uuid)
  2847. (when (and repo
  2848. @network-online-cursor
  2849. user-uuid graph-uuid txid
  2850. (graph-sync-off? graph-uuid)
  2851. (user/logged-in?)
  2852. (not (config/demo-graph? repo)))
  2853. (try
  2854. (when-let [sm (sync-manager-singleton current-user-uuid graph-uuid
  2855. (config/get-repo-dir repo) repo
  2856. txid *sync-state)]
  2857. (when (check-graph-belong-to-current-user current-user-uuid user-uuid)
  2858. (if-not (<! (<check-remote-graph-exists graph-uuid)) ; remote graph has been deleted
  2859. (clear-graphs-txid! repo)
  2860. (do
  2861. (state/set-file-sync-state graph-uuid @*sync-state)
  2862. (state/set-file-sync-manager graph-uuid sm)
  2863. ;; update global state when *sync-state changes
  2864. (add-watch *sync-state ::update-global-state
  2865. (fn [_ _ _ n]
  2866. (state/set-file-sync-state graph-uuid n)))
  2867. (state/set-state! [:file-sync/graph-state :current-graph-uuid] graph-uuid)
  2868. (.start sm)
  2869. (offer! remote->local-full-sync-chan true)
  2870. (offer! full-sync-chan true)))))
  2871. (catch :default e
  2872. (prn "Sync start error: ")
  2873. (log/error :exception e)))))
  2874. (reset! *sync-entered? false))))))
  2875. (defn- restart-if-stopped!
  2876. [is-active?]
  2877. (cond
  2878. (and is-active? (graph-sync-off? (second @graphs-txid)))
  2879. (<sync-start)
  2880. :else
  2881. (offer! pause-resume-chan is-active?)))
  2882. (def app-state-changed-cursor (rum/cursor state/state :mobile/app-state-change))
  2883. (def finished-local->remote-chan (chan 1))
  2884. (add-watch app-state-changed-cursor "sync"
  2885. (fn [_ _ _ {:keys [is-active?]}]
  2886. (cond
  2887. (mobile-util/native-android?)
  2888. (when-not is-active?
  2889. (<sync-local->remote-now))
  2890. (mobile-util/native-ios?)
  2891. (let [*task-id (atom nil)]
  2892. (if is-active?
  2893. (restart-if-stopped! is-active?)
  2894. (when (state/get-current-file-sync-graph-uuid)
  2895. (p/let [task-id (.beforeExit ^js BackgroundTask
  2896. (fn []
  2897. (go
  2898. ;; Wait for file watcher events
  2899. (<! (timeout 2000))
  2900. (util/drain-chan finished-local->remote-chan)
  2901. (<! (<sync-local->remote-now))
  2902. ;; wait at most 20s
  2903. (async/alts! [finished-local->remote-chan (timeout 20000)])
  2904. (p/let [active? (mobile-util/app-active?)]
  2905. (when-not active?
  2906. (offer! pause-resume-chan is-active?)))
  2907. (<! (timeout 5000))
  2908. (prn "finish task: " @*task-id)
  2909. (let [opt #js {:taskId @*task-id}]
  2910. (.finish ^js BackgroundTask opt)))))]
  2911. (reset! *task-id task-id)))))
  2912. :else
  2913. nil)))
  2914. ;;; ### some add-watches
  2915. ;; TOOD: replace this logic by pause/resume state
  2916. (defonce network-online-cursor (rum/cursor state/state :network/online?))
  2917. (add-watch network-online-cursor "sync-manage"
  2918. (fn [_k _r o n]
  2919. (cond
  2920. (and (true? o) (false? n))
  2921. (<sync-stop)
  2922. (and (false? o) (true? n))
  2923. (<sync-start)
  2924. :else
  2925. nil)))
  2926. (defonce auth-id-token-cursor (rum/cursor state/state :auth/id-token))
  2927. (add-watch auth-id-token-cursor "sync-manage"
  2928. (fn [_k _r _o n]
  2929. (when (nil? n)
  2930. (<sync-stop))))
  2931. ;;; ### some sync events handler
  2932. ;; re-exec remote->local-full-sync when it failed before
  2933. (def re-remote->local-full-sync-chan (chan 1))
  2934. (async/sub pubsub/sync-events-pub :remote->local-full-sync-failed re-remote->local-full-sync-chan)
  2935. (go-loop []
  2936. (let [{{graph-uuid :graph-uuid} :data} (<! re-remote->local-full-sync-chan)
  2937. {:keys [current-syncing-graph-uuid]}
  2938. (state/get-file-sync-state graph-uuid)]
  2939. (when (= graph-uuid current-syncing-graph-uuid)
  2940. (offer! remote->local-full-sync-chan true))
  2941. (recur)))
  2942. ;; re-exec local->remote-full-sync when it failed
  2943. (def re-local->remote-full-sync-chan (chan 1))
  2944. (async/sub pubsub/sync-events-pub :local->remote-full-sync-failed re-local->remote-full-sync-chan)
  2945. (go-loop []
  2946. (let [{{graph-uuid :graph-uuid} :data} (<! re-local->remote-full-sync-chan)
  2947. {:keys [current-syncing-graph-uuid]} (state/get-file-sync-state graph-uuid)]
  2948. (when (= graph-uuid current-syncing-graph-uuid)
  2949. (offer! full-sync-chan true))
  2950. (recur)))
  2951. ;;; add-tap
  2952. (comment
  2953. (def *x (atom nil))
  2954. (add-tap (fn [v] (reset! *x v)))
  2955. )