| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366 |
- (ns frontend.worker.shared-service
- "This allows multiple workers to share some resources (e.g. db access)"
- (:require [cljs-bean.core :as bean]
- [goog.object :as gobj]
- [lambdaisland.glogi :as log]
- [logseq.common.util :as common-util]
- [logseq.db :as ldb]
- [promesa.core :as p]))
- ;; Idea and code copied from https://github.com/Matt-TOTW/shared-service/blob/master/src/sharedService.ts
- ;; Related thread: https://github.com/rhashimoto/wa-sqlite/discussions/81
- (log/set-level 'frontend.worker.shared-service :debug)
- (defonce *master-client? (atom false))
- (defonce *master-re-check-trigger (atom nil))
- ;;; common-channel - Communication related to master-client election.
- ;;; client-channel - For API request-response data communication.
- ;;; master-slave-channels - Registered slave channels for master, all the slave
- ;;; channels need to be closed to not receive further
- ;;; messages when the master has been changed to slave.
- (defonce *common-channel (atom nil))
- (defonce *client-channel (atom nil))
- (defonce *master-slave-channels (atom #{}))
- ;;; record channel-listener here, to able to remove old listener before we addEventListener new one
- (defonce *common-channel-listener (atom nil))
- (defonce *client-channel-listener (atom nil))
- (defonce *current-request-id (volatile! 0))
- (defonce *requests-in-flight (volatile! (sorted-map))) ;sort by request-id
- ;;; The unique identity of the context where `js/navigator.locks.request` is called
- (defonce *client-id (atom nil))
- (defonce *master-client-lock (atom nil))
- (defn- next-request-id
- []
- (vswap! *current-request-id inc))
- (defn- release-master-client-lock!
- []
- (when-let [d @*master-client-lock]
- (p/resolve! d)
- nil))
- (defn- get-broadcast-channel-name [client-id service-name]
- (str client-id "-" service-name))
- (defn- random-id
- []
- (str (random-uuid)))
- (defn- do-not-wait
- [promise]
- promise
- nil)
- (defn- <get-client-id
- []
- (let [id (random-id)]
- (p/let [client-id (js/navigator.locks.request id #js {:mode "exclusive"}
- (fn [_]
- (p/let [^js locks (js/navigator.locks.query)]
- (->> (.-held locks)
- (some #(when (= (.-name %) id) %))
- .-clientId))))]
- (assert (some? client-id))
- (do-not-wait
- (js/navigator.locks.request client-id #js {:mode "exclusive"}
- ;; never release it
- (fn [_] (p/deferred))))
- (log/debug :client-id client-id)
- client-id)))
- (defn- <ensure-client-id
- []
- (or @*client-id
- (p/let [client-id (<get-client-id)]
- (reset! *client-id client-id))))
- (defn- ensure-common-channel
- [service-name]
- (or @*common-channel
- (reset! *common-channel (js/BroadcastChannel. (str "shared-service-common-channel-" service-name)))))
- (defn- ensure-client-channel
- [slave-client-id service-name]
- (or @*client-channel
- (reset! *client-channel (js/BroadcastChannel. (get-broadcast-channel-name slave-client-id service-name)))))
- (defn- listen-common-channel
- [common-channel listener-fn]
- (when-let [old-listener @*common-channel-listener]
- (.removeEventListener common-channel "message" old-listener))
- (reset! *common-channel-listener listener-fn)
- (.addEventListener common-channel "message" listener-fn))
- (defn- listen-client-channel
- [client-channel listener-fn]
- (when-let [old-listener @*client-channel-listener]
- (.removeEventListener client-channel "message" old-listener))
- (reset! *client-channel-listener listener-fn)
- (.addEventListener client-channel "message" listener-fn))
- (defn- <apply-target-f!
- [target method args]
- (let [f (gobj/get target method)]
- (assert (some? f) {:method method})
- (apply f args)))
- (defn- <check-master-or-slave-client!
- "Check if the current client is the master (otherwise, it is a slave)"
- [service-name <on-become-master <on-become-slave]
- (p/let [client-id (<ensure-client-id)]
- (do-not-wait
- (js/navigator.locks.request
- service-name #js {:mode "exclusive", :ifAvailable true}
- (fn [lock]
- (p/let [^js locks (js/navigator.locks.query)
- locked? (some #(when (and (= (.-name %) service-name)
- (= (.-clientId %) client-id))
- true)
- (.-held locks))]
- (cond
- (and locked? lock) ;become master
- (p/do!
- (reset! *master-client? true)
- (<on-become-master)
- (reset! *master-client-lock (p/deferred))
- ;; Keep lock until context destroyed
- @*master-client-lock)
- (and locked? (nil? lock)) ;already locked by this client, do nothing
- (assert (true? @*master-client?))
- (not locked?) ;become slave
- (p/do!
- (reset! *master-client? false)
- (<on-become-slave)))))))))
- (defn- clear-old-service!
- []
- (release-master-client-lock!)
- (reset! *master-client? false)
- (let [channels (into @*master-slave-channels [@*common-channel @*client-channel])]
- (doseq [^js channel channels]
- (when channel
- (.close channel))))
- (reset! *common-channel nil)
- (reset! *client-channel nil)
- (reset! *master-slave-channels #{})
- (reset! *common-channel-listener nil)
- (reset! *client-channel-listener nil)
- (vreset! *requests-in-flight (sorted-map))
- (remove-watch *master-re-check-trigger :check-master))
- (defn- on-response-handler
- [event]
- (let [{:keys [id type error result]} (bean/->clj (.-data event))]
- (when (identical? "response" type)
- (when-let [{:keys [resolve-fn reject-fn]} (get @*requests-in-flight id)]
- (vswap! *requests-in-flight dissoc id)
- (if error
- (do (log/error :error-process-request error)
- (reject-fn error))
- (resolve-fn result))))))
- (defn- create-on-request-handler
- [client-channel target]
- (fn [event]
- (let [{:keys [type method args id]} (bean/->clj (.-data event))]
- (when (identical? "request" type)
- (p/let [[result error]
- (-> (p/then (<apply-target-f! target method args)
- (fn [res] [res nil]))
- (p/catch
- (fn [e] [nil (if (instance? js/Error e)
- (bean/->clj e)
- e)])))]
- (.postMessage client-channel (bean/->js
- {:id id
- :type "response"
- :result result
- :error error
- :method-key (first args)})))))))
- (defn- <slave-registered-handler
- [service-name slave-client-id event *register-finish-promise?]
- (let [slave-client-id* (:slave-client-id event)]
- (when (= slave-client-id slave-client-id*)
- (p/let [^js locks (js/navigator.locks.query)
- already-watching?
- (some
- (fn [l] (and (= service-name (.-name l))
- (= slave-client-id (.-clientId l))))
- (.-pending locks))]
- (when-not already-watching? ;dont watch multiple times
- (do-not-wait
- (js/navigator.locks.request service-name #js {:mode "exclusive"}
- (fn [_lock]
- ;; The master has gone, elect the new master
- (log/debug "master has gone" nil)
- (reset! *master-re-check-trigger :re-check)))))
- (p/resolve! @*register-finish-promise?)))))
- (defn- <re-requests-in-flight-on-slave!
- [client-channel]
- (when (seq @*requests-in-flight)
- (log/debug "Requests were in flight when master changed. Requeuing..." (count @*requests-in-flight))
- (->>
- @*requests-in-flight
- (p/run!
- (fn [[id {:keys [method args _resolve-fn _reject-fn]}]]
- (.postMessage client-channel (bean/->js {:id id
- :type "request"
- :method method
- :args args})))))))
- (defn- <re-requests-in-flight-on-master!
- [target]
- (when (seq @*requests-in-flight)
- (log/debug "Requests were in flight when tab became master. Requeuing..." (count @*requests-in-flight))
- (->>
- @*requests-in-flight
- (p/run!
- (fn [[id {:keys [method args resolve-fn reject-fn]}]]
- (->
- (p/let [result (<apply-target-f! target method args)]
- (resolve-fn result))
- (p/catch (fn [e]
- (log/error "Error processing request" e)
- (reject-fn e)))
- (p/finally (fn []
- (vswap! *requests-in-flight dissoc id)))))))))
- (defn- <on-become-slave
- [slave-client-id service-name common-channel broadcast-data-types status-ready-promise]
- (let [client-channel (ensure-client-channel slave-client-id service-name)
- *register-finish-promise? (atom nil)
- <register #(do (.postMessage common-channel #js {:type "slave-register"
- :slave-client-id slave-client-id})
- (reset! *register-finish-promise? (p/deferred))
- @*register-finish-promise?)]
- (listen-client-channel client-channel on-response-handler)
- (listen-common-channel
- common-channel
- (fn [event]
- (let [{:keys [type data] :as event*} (bean/->clj (.-data event))]
- (if (contains? broadcast-data-types type)
- (.postMessage js/self data)
- (case type
- "master-changed"
- (p/do!
- (log/debug "master-client change detected. Re-registering..." nil)
- (<register)
- (<re-requests-in-flight-on-slave! client-channel))
- "slave-registered"
- (<slave-registered-handler service-name slave-client-id event* *register-finish-promise?)
- "slave-register"
- (log/debug :ignored-event event*)
- (log/error :unknown-event event*))))))
- (->
- (p/do!
- (<register)
- (p/resolve! status-ready-promise))
- (p/catch (fn [e]
- (log/error :on-become-slave e)
- (p/rejected e))))))
- (defn- <on-become-master
- [master-client-id service-name common-channel target on-become-master-handler status-ready-deferred-p]
- (log/debug :become-master master-client-id :service service-name)
- (listen-common-channel
- common-channel
- (fn [event]
- (let [{:keys [slave-client-id type]} (bean/->clj (.-data event))]
- (when (= type "slave-register")
- (let [client-channel (js/BroadcastChannel. (get-broadcast-channel-name slave-client-id service-name))]
- (swap! *master-slave-channels conj client-channel)
- (do-not-wait
- (js/navigator.locks.request slave-client-id #js {:mode "exclusive"}
- (fn [_]
- (log/debug :slave-has-gone slave-client-id)
- (.close client-channel))))
- (listen-client-channel client-channel (create-on-request-handler client-channel target))
- (.postMessage common-channel (bean/->js {:type "slave-registered"
- :slave-client-id slave-client-id
- :master-client-id master-client-id
- :serviceName service-name})))))))
- (.postMessage common-channel #js {:type "master-changed"
- :master-client-id master-client-id
- :serviceName service-name})
- (->
- (p/do!
- (on-become-master-handler service-name)
- (<re-requests-in-flight-on-master! target))
- (p/finally
- (fn []
- (p/resolve! status-ready-deferred-p)))))
- (defn <create-service
- "broadcast-data-types - For data matching these types,
- forward the data broadcast from the master client directly to the UI thread."
- [service-name target on-become-master-handler broadcast-data-types {:keys [import?]}]
- (clear-old-service!)
- (when import? (reset! *master-client? true))
- (p/let [broadcast-data-types (set broadcast-data-types)
- status {:ready (p/deferred)}
- common-channel (ensure-common-channel service-name)
- client-id (<ensure-client-id)
- <check-master-slave-fn!
- (fn []
- (<check-master-or-slave-client!
- service-name
- #(<on-become-master
- client-id service-name common-channel target
- on-become-master-handler (:ready status))
- #(<on-become-slave
- client-id service-name common-channel broadcast-data-types (:ready status))))]
- (<check-master-slave-fn!)
- (add-watch *master-re-check-trigger :check-master
- (fn [_ _ _ new-value]
- (when (= new-value :re-check)
- (p/do!
- (p/delay 100) ; why need delay here?
- (<check-master-slave-fn!)))))
- {:proxy (js/Proxy. target
- #js {:get (fn [target method]
- (if (= "remoteInvoke" method)
- (fn [args]
- (cond
- @*master-client?
- (<apply-target-f! target method args)
- :else
- (let [request-id (next-request-id)
- client-channel (ensure-client-channel client-id service-name)]
- (p/create
- (fn [resolve-fn reject-fn]
- (vswap! *requests-in-flight assoc request-id {:method method
- :args args
- :resolve-fn resolve-fn
- :reject-fn reject-fn})
- (.postMessage client-channel (bean/->js
- {:id request-id
- :type "request"
- :method method
- :args args})))))))
- (log/error :invalid-invoke-method method)))})
- :status status
- :client-id client-id}))
- (defn broadcast-to-clients!
- [type' data]
- (let [transit-payload (ldb/write-transit-str [type' data])]
- (when (exists? js/self) (.postMessage js/self transit-payload))
- (when-let [common-channel @*common-channel]
- (let [str-type' (common-util/keyword->string type')]
- (.postMessage common-channel #js {:type str-type'
- :data transit-payload})))))
|