|
@@ -174,24 +174,44 @@
|
|
|
(defn- send&recv*
|
|
(defn- send&recv*
|
|
|
"Return a task: send message wait to recv its response and return it.
|
|
"Return a task: send message wait to recv its response and return it.
|
|
|
Throw if timeout"
|
|
Throw if timeout"
|
|
|
- [mws message & {:keys [timeout-ms] :or {timeout-ms 10000}}]
|
|
|
|
|
|
|
+ [mws message & {:keys [timeout-ms s3-get-timeout-ms]
|
|
|
|
|
+ :or {timeout-ms 10000 s3-get-timeout-ms 10000}}]
|
|
|
{:pre [(pos-int? timeout-ms)
|
|
{:pre [(pos-int? timeout-ms)
|
|
|
(some? (:req-id message))]}
|
|
(some? (:req-id message))]}
|
|
|
(m/sp
|
|
(m/sp
|
|
|
(m/? (send mws message))
|
|
(m/? (send mws message))
|
|
|
(let [req-id (:req-id message)
|
|
(let [req-id (:req-id message)
|
|
|
- result (m/?
|
|
|
|
|
- (m/timeout
|
|
|
|
|
- (m/reduce
|
|
|
|
|
- (fn [_ v]
|
|
|
|
|
- (when (= req-id (:req-id v))
|
|
|
|
|
- (reduced v)))
|
|
|
|
|
- (recv-flow mws))
|
|
|
|
|
- timeout-ms))]
|
|
|
|
|
- (when-not result
|
|
|
|
|
- (throw (ex-info (str "recv timeout (" timeout-ms "ms)") {:missionary/retry true
|
|
|
|
|
- :type :rtc.exception/ws-timeout
|
|
|
|
|
- :message message})))
|
|
|
|
|
|
|
+ ws-message-result
|
|
|
|
|
+ (m/?
|
|
|
|
|
+ (m/timeout
|
|
|
|
|
+ (m/reduce
|
|
|
|
|
+ (fn [_ v]
|
|
|
|
|
+ (when (= req-id (:req-id v))
|
|
|
|
|
+ (reduced v)))
|
|
|
|
|
+ (recv-flow* mws))
|
|
|
|
|
+ timeout-ms
|
|
|
|
|
+ (ex-info (str "recv ws message timeout (" timeout-ms "ms)") {})))
|
|
|
|
|
+ result (if-let [s3-presign-url (:s3-presign-url ws-message-result)]
|
|
|
|
|
+ (let [{:keys [status body] :as r}
|
|
|
|
|
+ (m/? (m/timeout
|
|
|
|
|
+ (http/get s3-presign-url {:with-credentials? false})
|
|
|
|
|
+ s3-get-timeout-ms
|
|
|
|
|
+ (ex-info (str "recv s3 message timeout (" s3-get-timeout-ms "ms)") {})))]
|
|
|
|
|
+ (cond
|
|
|
|
|
+ (instance? ExceptionInfo r) r
|
|
|
|
|
+
|
|
|
|
|
+ (http/unexceptional-status? status)
|
|
|
|
|
+ (rtc-schema/data-from-ws-coercer (js->clj (js/JSON.parse body) :keywordize-keys true))
|
|
|
|
|
+
|
|
|
|
|
+ :else
|
|
|
|
|
+ {:req-id (:req-id ws-message-result)
|
|
|
|
|
+ :ex-message "get s3 object failed"
|
|
|
|
|
+ :ex-data {:type :rtc.exception/get-s3-object-failed :status status :body body}}))
|
|
|
|
|
+ ws-message-result)]
|
|
|
|
|
+ (when (instance? ExceptionInfo result)
|
|
|
|
|
+ (throw (ex-info (ex-message result) {:missionary/retry true
|
|
|
|
|
+ :type :rtc.exception/ws-timeout
|
|
|
|
|
+ :message message})))
|
|
|
result)))
|
|
result)))
|
|
|
|
|
|
|
|
(defn send&recv
|
|
(defn send&recv
|