ws.cljs 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199
  1. (ns frontend.worker.rtc.ws
  2. "Websocket wrapped by missionary.
  3. based on
  4. https://github.com/ReilySiegel/missionary-websocket/blob/master/src/com/reilysiegel/missionary/websocket.cljs"
  5. (:require [cljs-http-missionary.client :as http]
  6. [frontend.worker.rtc.const :as rtc-const]
  7. [frontend.worker.rtc.exception :as r.ex]
  8. [logseq.common.missionary :as c.m]
  9. [missionary.core :as m]))
  10. (defn- get-state
  11. [ws]
  12. (case (.-readyState ws)
  13. 0 :connecting
  14. 1 :open
  15. 2 :closing
  16. 3 :closed))
  17. (defn- open-ws-task
  18. [url]
  19. (fn [s! f!]
  20. (try
  21. (let [ws (js/WebSocket. url)]
  22. (set! (.-onopen ws)
  23. (fn [_]
  24. (let [close-dfv (m/dfv)
  25. mbx (m/mbx)]
  26. (set! (.-onopen ws) nil)
  27. (set! (.-onmessage ws) (fn [e] (mbx (.-data e))))
  28. (set! (.-onclose ws) (fn [e]
  29. (set! (.-onclose ws) nil)
  30. (close-dfv e)))
  31. (s! [mbx ws close-dfv]))))
  32. (set! (.-onclose ws)
  33. (fn [e]
  34. (set! (.-onopen ws) nil)
  35. (set! (.-onclose ws) nil)
  36. (f! e)))
  37. (fn canceller []
  38. ;; canceller will be called(no gua) even this task succeed
  39. ;; should only cancel :connecting state websocket
  40. ;; see also some explanations from lib author about canceller:
  41. ;; https://clojurians.slack.com/archives/CL85MBPEF/p1714323302110269
  42. (when (= :connecting (get-state ws))
  43. (.close ws))))
  44. (catch :default e
  45. (f! e) #(do)))))
  46. (defn- handle-close
  47. [x]
  48. (if (instance? js/CloseEvent x)
  49. (throw x)
  50. x))
  51. (defn- create-mws*
  52. [url]
  53. (m/sp
  54. (let [[mbx ws close-dfv] (m/? (open-ws-task url))]
  55. {:raw-ws ws
  56. :send (fn [data]
  57. (m/sp
  58. (handle-close
  59. (m/?
  60. (m/race close-dfv
  61. (m/sp (while (< 4096 (.-bufferedAmount ws))
  62. (m/? (m/sleep 50)))
  63. (.send ws data)))))))
  64. :recv-flow
  65. (m/stream
  66. (m/ap
  67. (loop []
  68. (m/amb
  69. (handle-close
  70. (m/? (m/race close-dfv mbx)))
  71. (recur)))))})))
  72. (defn closed?
  73. [mws]
  74. (contains? #{:closing :closed} (get-state (:raw-ws mws))))
  75. (defn mws-create
  76. "Return a task that create a mws (missionary wrapped websocket).
  77. When failed to open websocket, retry with backoff.
  78. TODO: retry ASAP once network condition changed"
  79. [url & {:keys [retry-count open-ws-timeout]
  80. :or {retry-count 10 open-ws-timeout 10000}}]
  81. (assert (and (pos-int? retry-count)
  82. (pos-int? open-ws-timeout))
  83. [retry-count open-ws-timeout])
  84. (c.m/backoff
  85. (take retry-count c.m/delays)
  86. (m/sp
  87. (try
  88. (if-let [ws (m/? (m/timeout (create-mws* url) open-ws-timeout))]
  89. ws
  90. (throw (ex-info "open websocket timeout" {:missionary/retry true
  91. :type :rtc.exception/ws-timeout})))
  92. (catch js/CloseEvent e
  93. (throw (ex-info "failed to open websocket conn"
  94. {:missionary/retry true}
  95. e)))))))
  96. (defn create-mws-state-flow
  97. [mws]
  98. (m/relieve
  99. (m/observe
  100. (fn ctor [emit!]
  101. (let [ws (:raw-ws mws)
  102. old-onclose (.-onclose ws)
  103. old-onerror (.-onerror ws)
  104. old-onopen (.-onopen ws)]
  105. (set! (.-onclose ws) (fn [e]
  106. (when old-onclose (old-onclose e))
  107. (emit! (get-state ws))))
  108. (set! (.-onerror ws) (fn [e]
  109. (when old-onerror (old-onerror e))
  110. (emit! (get-state ws))))
  111. (set! (.-onopen ws) (fn [e]
  112. (when old-onopen (old-onopen e))
  113. (emit! (get-state ws))))
  114. (emit! (get-state ws))
  115. (fn dtor []
  116. (set! (.-onclose ws) old-onclose)
  117. (set! (.-onerror ws) old-onerror)
  118. (set! (.-onopen ws) old-onopen)))))))
  119. (comment
  120. (defn close
  121. [m-ws]
  122. (.close (:raw-ws m-ws))))
  123. (defn send
  124. "Returns a task: send message"
  125. [mws message]
  126. (m/sp
  127. (let [decoded-message (rtc-const/data-to-ws-coercer message)
  128. message-str (js/JSON.stringify (clj->js (rtc-const/data-to-ws-encoder decoded-message)))]
  129. (m/? ((:send mws) message-str)))))
  130. (defn- recv-flow*
  131. "Throw if recv `Internal server error`"
  132. [m-ws]
  133. (assert (some? (:recv-flow m-ws)) m-ws)
  134. (m/eduction
  135. (map #(js->clj (js/JSON.parse %) :keywordize-keys true))
  136. (map (fn [m]
  137. (if (= "Internal server error" (:message m))
  138. (throw r.ex/ex-unknown-server-error)
  139. m)))
  140. (map rtc-const/data-from-ws-coercer)
  141. (:recv-flow m-ws)))
  142. (defn recv-flow
  143. "Throw if recv `Internal server error`.
  144. Also take care of :s3-presign-url.(when response is too huge, it's stored in s3)"
  145. [m-ws]
  146. (let [f (recv-flow* m-ws)]
  147. (m/ap
  148. (let [resp (m/?> f)]
  149. (if-let [s3-presign-url (:s3-presign-url resp)]
  150. (let [{:keys [status body]} (m/? (http/get s3-presign-url {:with-credentials? false}))]
  151. (if (http/unexceptional-status? status)
  152. (rtc-const/data-from-ws-coercer (js->clj (js/JSON.parse body) :keywordize-keys true))
  153. {:req-id (:req-id resp)
  154. :ex-message "get s3 object failed"
  155. :ex-data {:type :rtc.exception/get-s3-object-failed :status status :body body}}))
  156. resp)))))
  157. (defn- send&recv*
  158. "Return a task: send message wait to recv its response and return it.
  159. Throw if timeout"
  160. [mws message & {:keys [timeout-ms] :or {timeout-ms 10000}}]
  161. {:pre [(pos-int? timeout-ms)
  162. (some? (:req-id message))]}
  163. (m/sp
  164. (m/? (send mws message))
  165. (let [req-id (:req-id message)
  166. result (m/?
  167. (m/timeout
  168. (m/reduce
  169. (fn [_ v]
  170. (when (= req-id (:req-id v))
  171. (reduced v)))
  172. (recv-flow mws))
  173. timeout-ms))]
  174. (when-not result
  175. (throw (ex-info (str "recv timeout (" timeout-ms "ms)") {:missionary/retry true
  176. :type :rtc.exception/ws-timeout
  177. :message message})))
  178. result)))
  179. (defn send&recv
  180. "Return a task that send the message then wait to recv its response.
  181. Throw if timeout"
  182. [mws message & {:keys [timeout-ms] :or {timeout-ms 10000}}]
  183. (m/sp
  184. (let [req-id (str (random-uuid))
  185. message (assoc message :req-id req-id)]
  186. (m/? (send&recv* mws message :timeout-ms timeout-ms)))))