shared_service.cljs 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. (ns frontend.worker.shared-service
  2. "This allows multiple workers to share some resources (e.g. db access)"
  3. (:require [cljs-bean.core :as bean]
  4. [goog.object :as gobj]
  5. [lambdaisland.glogi :as log]
  6. [logseq.common.util :as common-util]
  7. [logseq.db :as ldb]
  8. [promesa.core :as p]))
  9. ;; Idea and code copied from https://github.com/Matt-TOTW/shared-service/blob/master/src/sharedService.ts
  10. ;; Related thread: https://github.com/rhashimoto/wa-sqlite/discussions/81
  11. (log/set-level 'frontend.worker.shared-service :debug)
  12. (defonce *master-client? (atom false))
  13. (defonce *master-re-check-trigger (atom nil))
  14. ;;; common-channel - Communication related to master-client election.
  15. ;;; client-channel - For API request-response data communication.
  16. ;;; master-slave-channels - Registered slave channels for master, all the slave
  17. ;;; channels need to be closed to not receive further
  18. ;;; messages when the master has been changed to slave.
  19. (defonce *common-channel (atom nil))
  20. (defonce *client-channel (atom nil))
  21. (defonce *master-slave-channels (atom #{}))
  22. ;;; record channel-listener here, to able to remove old listener before we addEventListener new one
  23. (defonce *common-channel-listener (atom nil))
  24. (defonce *client-channel-listener (atom nil))
  25. (defonce *current-request-id (volatile! 0))
  26. (defonce *requests-in-flight (volatile! (sorted-map))) ;sort by request-id
  27. ;;; The unique identity of the context where `js/navigator.locks.request` is called
  28. (defonce *client-id (atom nil))
  29. (defonce *master-client-lock (atom nil))
  30. (defn- next-request-id
  31. []
  32. (vswap! *current-request-id inc))
  33. (defn- release-master-client-lock!
  34. []
  35. (when-let [d @*master-client-lock]
  36. (p/resolve! d)
  37. nil))
  38. (defn- get-broadcast-channel-name [client-id service-name]
  39. (str client-id "-" service-name))
  40. (defn- random-id
  41. []
  42. (str (random-uuid)))
  43. (defn- do-not-wait
  44. [promise]
  45. promise
  46. nil)
  47. (defn- <get-client-id
  48. []
  49. (let [id (random-id)]
  50. (p/let [client-id (js/navigator.locks.request id #js {:mode "exclusive"}
  51. (fn [_]
  52. (p/let [^js locks (js/navigator.locks.query)]
  53. (->> (.-held locks)
  54. (some #(when (= (.-name %) id) %))
  55. .-clientId))))]
  56. (assert (some? client-id))
  57. (do-not-wait
  58. (js/navigator.locks.request client-id #js {:mode "exclusive"}
  59. ;; never release it
  60. (fn [_] (p/deferred))))
  61. (log/debug :client-id client-id)
  62. client-id)))
  63. (defn- <ensure-client-id
  64. []
  65. (or @*client-id
  66. (p/let [client-id (<get-client-id)]
  67. (reset! *client-id client-id))))
  68. (defn- ensure-common-channel
  69. [service-name]
  70. (or @*common-channel
  71. (reset! *common-channel (js/BroadcastChannel. (str "shared-service-common-channel-" service-name)))))
  72. (defn- ensure-client-channel
  73. [slave-client-id service-name]
  74. (or @*client-channel
  75. (reset! *client-channel (js/BroadcastChannel. (get-broadcast-channel-name slave-client-id service-name)))))
  76. (defn- listen-common-channel
  77. [common-channel listener-fn]
  78. (when-let [old-listener @*common-channel-listener]
  79. (.removeEventListener common-channel "message" old-listener))
  80. (reset! *common-channel-listener listener-fn)
  81. (.addEventListener common-channel "message" listener-fn))
  82. (defn- listen-client-channel
  83. [client-channel listener-fn]
  84. (when-let [old-listener @*client-channel-listener]
  85. (.removeEventListener client-channel "message" old-listener))
  86. (reset! *client-channel-listener listener-fn)
  87. (.addEventListener client-channel "message" listener-fn))
  88. (defn- <apply-target-f!
  89. [target method args]
  90. (let [f (gobj/get target method)]
  91. (assert (some? f) {:method method})
  92. (apply f args)))
  93. (defn- <check-master-or-slave-client!
  94. "Check if the current client is the master (otherwise, it is a slave)"
  95. [service-name <on-become-master <on-become-slave]
  96. (p/let [client-id (<ensure-client-id)]
  97. (do-not-wait
  98. (js/navigator.locks.request
  99. service-name #js {:mode "exclusive", :ifAvailable true}
  100. (fn [lock]
  101. (p/let [^js locks (js/navigator.locks.query)
  102. locked? (some #(when (and (= (.-name %) service-name)
  103. (= (.-clientId %) client-id))
  104. true)
  105. (.-held locks))]
  106. (cond
  107. (and locked? lock) ;become master
  108. (p/do!
  109. (reset! *master-client? true)
  110. (<on-become-master)
  111. (reset! *master-client-lock (p/deferred))
  112. ;; Keep lock until context destroyed
  113. @*master-client-lock)
  114. (and locked? (nil? lock)) ;already locked by this client, do nothing
  115. (assert (true? @*master-client?))
  116. (not locked?) ;become slave
  117. (p/do!
  118. (reset! *master-client? false)
  119. (<on-become-slave)))))))))
  120. (defn- clear-old-service!
  121. []
  122. (release-master-client-lock!)
  123. (reset! *master-client? false)
  124. (let [channels (into @*master-slave-channels [@*common-channel @*client-channel])]
  125. (doseq [^js channel channels]
  126. (when channel
  127. (.close channel))))
  128. (reset! *common-channel nil)
  129. (reset! *client-channel nil)
  130. (reset! *master-slave-channels #{})
  131. (reset! *common-channel-listener nil)
  132. (reset! *client-channel-listener nil)
  133. (vreset! *requests-in-flight (sorted-map))
  134. (remove-watch *master-re-check-trigger :check-master))
  135. (defn- on-response-handler
  136. [event]
  137. (let [{:keys [id type error result]} (bean/->clj (.-data event))]
  138. (when (identical? "response" type)
  139. (when-let [{:keys [resolve-fn reject-fn]} (get @*requests-in-flight id)]
  140. (vswap! *requests-in-flight dissoc id)
  141. (if error
  142. (do (log/error :error-process-request error)
  143. (reject-fn error))
  144. (resolve-fn result))))))
  145. (defn- create-on-request-handler
  146. [client-channel target]
  147. (fn [event]
  148. (let [{:keys [type method args id]} (bean/->clj (.-data event))]
  149. (when (identical? "request" type)
  150. (p/let [[result error]
  151. (-> (p/then (<apply-target-f! target method args)
  152. (fn [res] [res nil]))
  153. (p/catch
  154. (fn [e] [nil (if (instance? js/Error e)
  155. (bean/->clj e)
  156. e)])))]
  157. (.postMessage client-channel (bean/->js
  158. {:id id
  159. :type "response"
  160. :result result
  161. :error error
  162. :method-key (first args)})))))))
  163. (defn- <slave-registered-handler
  164. [service-name slave-client-id event *register-finish-promise?]
  165. (let [slave-client-id* (:slave-client-id event)]
  166. (when (= slave-client-id slave-client-id*)
  167. (p/let [^js locks (js/navigator.locks.query)
  168. already-watching?
  169. (some
  170. (fn [l] (and (= service-name (.-name l))
  171. (= slave-client-id (.-clientId l))))
  172. (.-pending locks))]
  173. (when-not already-watching? ;dont watch multiple times
  174. (do-not-wait
  175. (js/navigator.locks.request service-name #js {:mode "exclusive"}
  176. (fn [_lock]
  177. ;; The master has gone, elect the new master
  178. (log/debug "master has gone" nil)
  179. (reset! *master-re-check-trigger :re-check)))))
  180. (p/resolve! @*register-finish-promise?)))))
  181. (defn- <re-requests-in-flight-on-slave!
  182. [client-channel]
  183. (when (seq @*requests-in-flight)
  184. (log/debug "Requests were in flight when master changed. Requeuing..." (count @*requests-in-flight))
  185. (->>
  186. @*requests-in-flight
  187. (p/run!
  188. (fn [[id {:keys [method args _resolve-fn _reject-fn]}]]
  189. (.postMessage client-channel (bean/->js {:id id
  190. :type "request"
  191. :method method
  192. :args args})))))))
  193. (defn- <re-requests-in-flight-on-master!
  194. [target]
  195. (when (seq @*requests-in-flight)
  196. (log/debug "Requests were in flight when tab became master. Requeuing..." (count @*requests-in-flight))
  197. (->>
  198. @*requests-in-flight
  199. (p/run!
  200. (fn [[id {:keys [method args resolve-fn reject-fn]}]]
  201. (->
  202. (p/let [result (<apply-target-f! target method args)]
  203. (resolve-fn result))
  204. (p/catch (fn [e]
  205. (log/error "Error processing request" e)
  206. (reject-fn e)))
  207. (p/finally (fn []
  208. (vswap! *requests-in-flight dissoc id)))))))))
  209. (defn- <on-become-slave
  210. [slave-client-id service-name common-channel broadcast-data-types status-ready-promise]
  211. (let [client-channel (ensure-client-channel slave-client-id service-name)
  212. *register-finish-promise? (atom nil)
  213. <register #(do (.postMessage common-channel #js {:type "slave-register"
  214. :slave-client-id slave-client-id})
  215. (reset! *register-finish-promise? (p/deferred))
  216. @*register-finish-promise?)]
  217. (listen-client-channel client-channel on-response-handler)
  218. (listen-common-channel
  219. common-channel
  220. (fn [event]
  221. (let [{:keys [type data] :as event*} (bean/->clj (.-data event))]
  222. (if (contains? broadcast-data-types type)
  223. (.postMessage js/self data)
  224. (case type
  225. "master-changed"
  226. (p/do!
  227. (log/debug "master-client change detected. Re-registering..." nil)
  228. (<register)
  229. (<re-requests-in-flight-on-slave! client-channel))
  230. "slave-registered"
  231. (<slave-registered-handler service-name slave-client-id event* *register-finish-promise?)
  232. "slave-register"
  233. (log/debug :ignored-event event*)
  234. (log/error :unknown-event event*))))))
  235. (->
  236. (p/do!
  237. (<register)
  238. (p/resolve! status-ready-promise))
  239. (p/catch (fn [e]
  240. (log/error :on-become-slave e)
  241. (p/rejected e))))))
  242. (defn- <on-become-master
  243. [master-client-id service-name common-channel target on-become-master-handler status-ready-deferred-p]
  244. (log/debug :become-master master-client-id :service service-name)
  245. (listen-common-channel
  246. common-channel
  247. (fn [event]
  248. (let [{:keys [slave-client-id type]} (bean/->clj (.-data event))]
  249. (when (= type "slave-register")
  250. (let [client-channel (js/BroadcastChannel. (get-broadcast-channel-name slave-client-id service-name))]
  251. (swap! *master-slave-channels conj client-channel)
  252. (do-not-wait
  253. (js/navigator.locks.request slave-client-id #js {:mode "exclusive"}
  254. (fn [_]
  255. (log/debug :slave-has-gone slave-client-id)
  256. (.close client-channel))))
  257. (listen-client-channel client-channel (create-on-request-handler client-channel target))
  258. (.postMessage common-channel (bean/->js {:type "slave-registered"
  259. :slave-client-id slave-client-id
  260. :master-client-id master-client-id
  261. :serviceName service-name})))))))
  262. (.postMessage common-channel #js {:type "master-changed"
  263. :master-client-id master-client-id
  264. :serviceName service-name})
  265. (->
  266. (p/do!
  267. (on-become-master-handler service-name)
  268. (<re-requests-in-flight-on-master! target))
  269. (p/finally
  270. (fn []
  271. (p/resolve! status-ready-deferred-p)))))
  272. (defn <create-service
  273. "broadcast-data-types - For data matching these types,
  274. forward the data broadcast from the master client directly to the UI thread."
  275. [service-name target on-become-master-handler broadcast-data-types {:keys [import?]}]
  276. (clear-old-service!)
  277. (when import? (reset! *master-client? true))
  278. (p/let [broadcast-data-types (set broadcast-data-types)
  279. status {:ready (p/deferred)}
  280. common-channel (ensure-common-channel service-name)
  281. client-id (<ensure-client-id)
  282. <check-master-slave-fn!
  283. (fn []
  284. (<check-master-or-slave-client!
  285. service-name
  286. #(<on-become-master
  287. client-id service-name common-channel target
  288. on-become-master-handler (:ready status))
  289. #(<on-become-slave
  290. client-id service-name common-channel broadcast-data-types (:ready status))))]
  291. (<check-master-slave-fn!)
  292. (add-watch *master-re-check-trigger :check-master
  293. (fn [_ _ _ new-value]
  294. (when (= new-value :re-check)
  295. (p/do!
  296. (p/delay 100) ; why need delay here?
  297. (<check-master-slave-fn!)))))
  298. {:proxy (js/Proxy. target
  299. #js {:get (fn [target method]
  300. (if (= "remoteInvoke" method)
  301. (fn [args]
  302. (cond
  303. @*master-client?
  304. (<apply-target-f! target method args)
  305. :else
  306. (let [request-id (next-request-id)
  307. client-channel (ensure-client-channel client-id service-name)]
  308. (p/create
  309. (fn [resolve-fn reject-fn]
  310. (vswap! *requests-in-flight assoc request-id {:method method
  311. :args args
  312. :resolve-fn resolve-fn
  313. :reject-fn reject-fn})
  314. (.postMessage client-channel (bean/->js
  315. {:id request-id
  316. :type "request"
  317. :method method
  318. :args args})))))))
  319. (log/error :invalid-invoke-method method)))})
  320. :status status
  321. :client-id client-id}))
  322. (defn broadcast-to-clients!
  323. [type' data]
  324. (let [transit-payload (ldb/write-transit-str [type' data])]
  325. (when (exists? js/self) (.postMessage js/self transit-payload))
  326. (when-let [common-channel @*common-channel]
  327. (let [str-type' (common-util/keyword->string type')]
  328. (.postMessage common-channel #js {:type str-type'
  329. :data transit-payload})))))