Selaa lähdekoodia

enhance: replace some cljs-http with cljs-http-missionary

rcmerci 1 vuosi sitten
vanhempi
sitoutus
4335c3e7e4

+ 43 - 28
src/main/frontend/common/missionary_util.cljs

@@ -3,7 +3,8 @@
   (:require-macros [frontend.common.missionary-util])
   (:require [cljs.core.async.impl.channels]
             [clojure.core.async :as a]
-            [missionary.core :as m])
+            [missionary.core :as m]
+            [promesa.protocols :as pt])
   ;; (:import [missionary Cancelled])
   )
 
@@ -14,19 +15,19 @@
   "Retry task when it throw exception `(get ex-data :missionary/retry)`"
   [delays-seq task]
   (m/sp
-   (loop [[delay & rest-delays] (seq delays-seq)]
-     (let [r (try
-               (m/? task)
-               (catch :default e
-                 (if (and (some-> e ex-data :missionary/retry)
-                          (pos-int? delay))
-                   (do (m/? (m/sleep delay))
-                       (println :missionary/retry "after" delay "ms (" (ex-message e) ")")
-                       retry-sentinel)
-                   (throw e))))]
-       (if (identical? r retry-sentinel)
-         (recur rest-delays)
-         r)))))
+    (loop [[delay & rest-delays] (seq delays-seq)]
+      (let [r (try
+                (m/? task)
+                (catch :default e
+                  (if (and (some-> e ex-data :missionary/retry)
+                           (pos-int? delay))
+                    (do (m/? (m/sleep delay))
+                        (println :missionary/retry "after" delay "ms (" (ex-message e) ")")
+                        retry-sentinel)
+                    (throw e))))]
+        (if (identical? r retry-sentinel)
+          (recur rest-delays)
+          r)))))
 
 (defn mix
   "Return a flow which is mixed by `flows`"
@@ -40,10 +41,10 @@
   ([interval-ms value]
    (->>
     (m/ap
-     (loop []
-       (m/amb
-        (m/? (m/sleep interval-ms value))
-        (recur))))
+      (loop []
+        (m/amb
+         (m/? (m/sleep interval-ms value))
+         (recur))))
     (m/reductions {} value)
     (m/latest identity))))
 
@@ -51,10 +52,10 @@
   (defn debounce
     [duration-ms flow]
     (m/ap
-     (let [x (m/?< flow)]
-       (try (m/? (m/sleep duration-ms x))
-            (catch Cancelled _
-              (m/amb)))))))
+      (let [x (m/?< flow)]
+        (try (m/? (m/sleep duration-ms x))
+             (catch Cancelled _
+               (m/amb)))))))
 
 (defn run-task
   "Return the canceler"
@@ -83,10 +84,24 @@
 (defn <!
   "Return a task.
   if arg is a channel, takes from given channel, completing with value when take is accepted, or nil if port was closed.
-  if arg is a promise, completing with the result of given promise."
-  [chan-or-promise]
-  (if (instance? cljs.core.async.impl.channels/ManyToManyChannel chan-or-promise)
-    (doto (m/dfv) (->> (a/take! chan-or-promise)))
+  if arg is a promise, completing with the result of given promise.
+  if arg is a missionary task, just return it"
+  [chan-or-promise-or-task]
+  (cond
+    ;; async
+    (instance? cljs.core.async.impl.channels/ManyToManyChannel chan-or-promise-or-task)
+    (doto (m/dfv) (->> (a/take! chan-or-promise-or-task)))
+
+    ;; promise
+    (or (instance? js/Promise chan-or-promise-or-task)
+        (satisfies? pt/IPromise chan-or-promise-or-task))
     (let [v (m/dfv)]
-      (.then chan-or-promise #(v (fn [] %)) #(v (fn [] (throw %))))
-      (m/absolve v))))
+      (.then chan-or-promise-or-task #(v (fn [] %)) #(v (fn [] (throw %))))
+      (m/absolve v))
+
+    ;; missionary task
+    (fn? chan-or-promise-or-task)
+    chan-or-promise-or-task
+
+    :else
+    (throw (ex-info "Unsupported arg" {:type (type chan-or-promise-or-task)}))))

+ 96 - 96
src/main/frontend/worker/rtc/full_upload_download_graph.cljs

@@ -1,7 +1,7 @@
 (ns frontend.worker.rtc.full-upload-download-graph
   "- upload local graph to remote
   - download remote graph"
-  (:require [cljs-http.client :as http]
+  (:require [cljs-http-missionary.client :as http]
             [clojure.set :as set]
             [datascript.core :as d]
             [frontend.common.missionary-util :as c.m]
@@ -114,41 +114,41 @@
 (defn new-task--upload-graph
   [get-ws-create-task repo conn remote-graph-name]
   (m/sp
-   (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :fetch-presigned-put-url
-                                               :message "fetching presigned put-url"})
-   (let [[{:keys [url key]} all-blocks-str]
-         (m/?
-          (m/join
-           vector
-           (ws-util/send&recv get-ws-create-task {:action "presign-put-temp-s3-obj"})
-           (m/sp
-            (let [all-blocks (export-as-blocks @conn)]
-              (ldb/write-transit-str all-blocks)))))]
-     (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :upload-data
-                                                 :message "uploading data"})
-     (c.m/<? (http/put url {:body all-blocks-str :with-credentials? false}))
-     (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :request-upload-graph
-                                                 :message "requesting upload-graph"})
-     (let [aes-key (c.m/<? (crypt/<gen-aes-key))
-           aes-key-jwk (ldb/write-transit-str (c.m/<? (crypt/<export-key aes-key)))
-           upload-resp
-           (m/? (ws-util/send&recv get-ws-create-task {:action "upload-graph"
-                                                       :s3-key key
-                                                       :graph-name remote-graph-name}))]
-       (if-let [graph-uuid (:graph-uuid upload-resp)]
-         (do
-           (ldb/transact! conn
-                          [{:db/ident :logseq.kv/graph-uuid :kv/value graph-uuid}
-                           {:db/ident :logseq.kv/graph-local-tx :kv/value "0"}])
-           (client-op/update-graph-uuid repo graph-uuid)
-           (crypt/store-graph-keys-jwk repo aes-key-jwk)
-           (when-not rtc-const/RTC-E2E-TEST
-             (let [^js worker-obj (:worker/object @worker-state/*state)]
-               (c.m/<? (.storeMetadata worker-obj repo (pr-str {:kv/value graph-uuid})))))
-           (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :upload-completed
-                                                       :message "upload-graph completed"})
-           {:graph-uuid graph-uuid})
-         (throw (ex-info "upload-graph failed" {:upload-resp upload-resp})))))))
+    (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :fetch-presigned-put-url
+                                                :message "fetching presigned put-url"})
+    (let [[{:keys [url key]} all-blocks-str]
+          (m/?
+           (m/join
+            vector
+            (ws-util/send&recv get-ws-create-task {:action "presign-put-temp-s3-obj"})
+            (m/sp
+              (let [all-blocks (export-as-blocks @conn)]
+                (ldb/write-transit-str all-blocks)))))]
+      (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :upload-data
+                                                  :message "uploading data"})
+      (m/? (http/put url {:body all-blocks-str :with-credentials? false}))
+      (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :request-upload-graph
+                                                  :message "requesting upload-graph"})
+      (let [aes-key (c.m/<? (crypt/<gen-aes-key))
+            aes-key-jwk (ldb/write-transit-str (c.m/<? (crypt/<export-key aes-key)))
+            upload-resp
+            (m/? (ws-util/send&recv get-ws-create-task {:action "upload-graph"
+                                                        :s3-key key
+                                                        :graph-name remote-graph-name}))]
+        (if-let [graph-uuid (:graph-uuid upload-resp)]
+          (do
+            (ldb/transact! conn
+                           [{:db/ident :logseq.kv/graph-uuid :kv/value graph-uuid}
+                            {:db/ident :logseq.kv/graph-local-tx :kv/value "0"}])
+            (client-op/update-graph-uuid repo graph-uuid)
+            (crypt/store-graph-keys-jwk repo aes-key-jwk)
+            (when-not rtc-const/RTC-E2E-TEST
+              (let [^js worker-obj (:worker/object @worker-state/*state)]
+                (c.m/<? (.storeMetadata worker-obj repo (pr-str {:kv/value graph-uuid})))))
+            (rtc-log-and-state/rtc-log :rtc.log/upload {:sub-type :upload-completed
+                                                        :message "upload-graph completed"})
+            {:graph-uuid graph-uuid})
+          (throw (ex-info "upload-graph failed" {:upload-resp upload-resp})))))))
 
 (def page-of-block
   (memoize
@@ -262,9 +262,9 @@
   (let [{:keys [t blocks]} all-blocks
         card-one-attrs (blocks->card-one-attrs blocks)
         blocks1 (worker-util/profile :convert-card-one-value-from-value-coll
-                                    (map (partial convert-card-one-value-from-value-coll card-one-attrs) blocks))
+                                     (map (partial convert-card-one-value-from-value-coll card-one-attrs) blocks))
         blocks2 (worker-util/profile :normalize-remote-blocks
-                 (normalized-remote-blocks-coercer blocks1))
+                                     (normalized-remote-blocks-coercer blocks1))
         ;;TODO: remove this, client/schema already converted to :db/cardinality, :db/valueType by remote,
         ;; and :client/schema should be removed by remote too
         blocks (map #(dissoc % :client/schema) blocks2)
@@ -277,25 +277,25 @@
                              schema-blocks)
         ^js worker-obj (:worker/object @worker-state/*state)]
     (m/sp
-     (client-op/update-local-tx repo t)
-     (rtc-log-and-state/update-local-t graph-uuid t)
-     (rtc-log-and-state/update-remote-t graph-uuid t)
-     (if rtc-const/RTC-E2E-TEST
-       (create-graph-for-rtc-test repo init-tx-data tx-data)
-       (c.m/<?
-        (p/do!
-         (.createOrOpenDB worker-obj repo (ldb/write-transit-str {:close-other-db? false}))
-         (.exportDB worker-obj repo)
-         (.transact worker-obj repo init-tx-data {:rtc-download-graph? true
-                                                  :gen-undo-ops? false
+      (client-op/update-local-tx repo t)
+      (rtc-log-and-state/update-local-t graph-uuid t)
+      (rtc-log-and-state/update-remote-t graph-uuid t)
+      (if rtc-const/RTC-E2E-TEST
+        (create-graph-for-rtc-test repo init-tx-data tx-data)
+        (c.m/<?
+         (p/do!
+          (.createOrOpenDB worker-obj repo (ldb/write-transit-str {:close-other-db? false}))
+          (.exportDB worker-obj repo)
+          (.transact worker-obj repo init-tx-data {:rtc-download-graph? true
+                                                   :gen-undo-ops? false
                                                      ;; only transact db schema, skip validation to avoid warning
-                                                  :skip-validate-db? true
-                                                  :persist-op? false} (worker-state/get-context))
-         (.transact worker-obj repo tx-data {:rtc-download-graph? true
-                                             :gen-undo-ops? false
-                                             :persist-op? false} (worker-state/get-context))
-         (transact-block-refs! repo))))
-     (worker-util/post-message :add-repo {:repo repo}))))
+                                                   :skip-validate-db? true
+                                                   :persist-op? false} (worker-state/get-context))
+          (.transact worker-obj repo tx-data {:rtc-download-graph? true
+                                              :gen-undo-ops? false
+                                              :persist-op? false} (worker-state/get-context))
+          (transact-block-refs! repo))))
+      (worker-util/post-message :add-repo {:repo repo}))))
 
 ;;;;;;;;;;;;;;;;;;;;;;;;;;
 ;; async download-graph ;;
@@ -320,48 +320,48 @@
   [get-ws-create-task download-info-uuid graph-uuid timeout-ms]
   (->
    (m/sp
-    (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :wait-remote-graph-data-ready
-                                                  :message "waiting for the remote to prepare the data"
-                                                  :graph-uuid graph-uuid})
-    (loop []
-      (m/? (m/sleep 3000))
-      (let [{:keys [download-info-list]}
-            (m/? (ws-util/send&recv get-ws-create-task {:action "download-info-list"
-                                                        :graph-uuid graph-uuid}))]
-        (if-let [found-download-info
-                 (some
-                  (fn [download-info]
-                    (when (and (= download-info-uuid (:download-info-uuid download-info))
-                               (:download-info-s3-url download-info))
-                      download-info))
-                  download-info-list)]
-          found-download-info
-          (recur)))))
+     (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :wait-remote-graph-data-ready
+                                                   :message "waiting for the remote to prepare the data"
+                                                   :graph-uuid graph-uuid})
+     (loop []
+       (m/? (m/sleep 3000))
+       (let [{:keys [download-info-list]}
+             (m/? (ws-util/send&recv get-ws-create-task {:action "download-info-list"
+                                                         :graph-uuid graph-uuid}))]
+         (if-let [found-download-info
+                  (some
+                   (fn [download-info]
+                     (when (and (= download-info-uuid (:download-info-uuid download-info))
+                                (:download-info-s3-url download-info))
+                       download-info))
+                   download-info-list)]
+           found-download-info
+           (recur)))))
    (m/timeout timeout-ms :timeout)))
 
 (defn new-task--download-graph-from-s3
   [graph-uuid graph-name s3-url]
   (m/sp
-   (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :downloading-graph-data
-                                                 :message "downloading graph data"
-                                                 :graph-uuid graph-uuid})
-   (let [^js worker-obj              (:worker/object @worker-state/*state)
-         {:keys [status body] :as r} (c.m/<? (http/get s3-url {:with-credentials? false}))
-         repo                        (str sqlite-util/db-version-prefix graph-name)]
-     (if (not= 200 status)
-       (throw (ex-info "download-graph from s3 failed" {:resp r}))
-       (do
-         (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :transact-graph-data-to-db
-                                                       :message "transacting graph data to local db"
-                                                       :graph-uuid graph-uuid})
-         (let [all-blocks (ldb/read-transit-str body)]
-           (worker-state/set-rtc-downloading-graph! true)
-           (m/? (new-task--transact-remote-all-blocks all-blocks repo graph-uuid))
-           (client-op/update-graph-uuid repo graph-uuid)
-           (when-not rtc-const/RTC-E2E-TEST
-             (c.m/<? (.storeMetadata worker-obj repo (pr-str {:kv/value graph-uuid}))))
-           (worker-state/set-rtc-downloading-graph! false)
-           (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :download-completed
-                                                         :message "download completed"
-                                                         :graph-uuid graph-uuid})
-           nil))))))
+    (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :downloading-graph-data
+                                                  :message "downloading graph data"
+                                                  :graph-uuid graph-uuid})
+    (let [^js worker-obj              (:worker/object @worker-state/*state)
+          {:keys [status body] :as r} (m/? (http/get s3-url {:with-credentials? false}))
+          repo                        (str sqlite-util/db-version-prefix graph-name)]
+      (if (not= 200 status)
+        (throw (ex-info "download-graph from s3 failed" {:resp r}))
+        (do
+          (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :transact-graph-data-to-db
+                                                        :message "transacting graph data to local db"
+                                                        :graph-uuid graph-uuid})
+          (let [all-blocks (ldb/read-transit-str body)]
+            (worker-state/set-rtc-downloading-graph! true)
+            (m/? (new-task--transact-remote-all-blocks all-blocks repo graph-uuid))
+            (client-op/update-graph-uuid repo graph-uuid)
+            (when-not rtc-const/RTC-E2E-TEST
+              (c.m/<? (.storeMetadata worker-obj repo (pr-str {:kv/value graph-uuid}))))
+            (worker-state/set-rtc-downloading-graph! false)
+            (rtc-log-and-state/rtc-log :rtc.log/download {:sub-type :download-completed
+                                                          :message "download completed"
+                                                          :graph-uuid graph-uuid})
+            nil))))))

+ 2 - 2
src/main/frontend/worker/rtc/ws.cljs

@@ -2,7 +2,7 @@
   "Websocket wrapped by missionary.
   based on
   https://github.com/ReilySiegel/missionary-websocket/blob/master/src/com/reilysiegel/missionary/websocket.cljs"
-  (:require [cljs-http.client :as http]
+  (:require [cljs-http-missionary.client :as http]
             [frontend.common.missionary-util :as c.m]
             [frontend.worker.rtc.const :as rtc-const]
             [frontend.worker.rtc.exception :as r.ex]
@@ -158,7 +158,7 @@
     (m/ap
       (let [resp (m/?> f)]
         (if-let [s3-presign-url (:s3-presign-url resp)]
-          (let [{:keys [status body]} (c.m/<? (http/get s3-presign-url {:with-credentials? false}))]
+          (let [{:keys [status body]} (m/? (http/get s3-presign-url {:with-credentials? false}))]
             (if (http/unexceptional-status? status)
               (rtc-const/data-from-ws-coercer (js->clj (js/JSON.parse body) :keywordize-keys true))
               {:req-id (:req-id resp)

+ 2 - 3
src/main/frontend/worker/rtc/ws_util.cljs

@@ -1,7 +1,6 @@
 (ns frontend.worker.rtc.ws-util
   "Add RTC related logic to the function based on ws."
-  (:require [cljs-http.client :as http]
-            [frontend.common.missionary-util :as c.m]
+  (:require [cljs-http-missionary.client :as http]
             [frontend.worker.rtc.const :as rtc-const]
             [frontend.worker.rtc.exception :as r.ex]
             [frontend.worker.rtc.ws :as ws]
@@ -31,7 +30,7 @@
          len (.-length (utf8/encode message-str))]
      (when (< 100000 len)
        (let [{:keys [url key]} (m/? (ws/send&recv ws {:action "presign-put-temp-s3-obj"}))
-             {:keys [status] :as resp} (c.m/<? (http/put url {:body message-str :with-credentials? false}))]
+             {:keys [status] :as resp} (m/? (http/put url {:body message-str :with-credentials? false}))]
          (when-not (http/unexceptional-status? status)
            (throw (ex-info "failed to upload apply-ops message" {:resp resp})))
          key)))))