Browse Source

integrate worker sync

Tienson Qin 2 months ago
parent
commit
f1d15f5c64

+ 2 - 1
deps/worker-sync/shadow-cljs.edn

@@ -6,7 +6,8 @@
  {:worker-sync {:target :esm
                 :output-dir "worker/dist/worker"
                 :modules {:main {:exports {default logseq.worker-sync.worker/worker
-                                           SyncDO logseq.worker-sync.worker/SyncDO}}}
+                                           SyncDO logseq.worker-sync.worker/SyncDO
+                                           SyncIndexDO logseq.worker-sync.worker/SyncIndexDO}}}
                 :js-options {:js-provider :import}
                 :closure-defines {shadow.cljs.devtools.client.env/enabled false}
                 :devtools {:enabled false}}

+ 100 - 0
deps/worker-sync/src/logseq/worker_sync/worker.cljs

@@ -17,6 +17,13 @@
       (= path "/health")
       (common/json-response {:ok true})
 
+      (or (= path "/graphs")
+          (string/starts-with? path "/graphs/"))
+      (let [^js namespace (.-LOGSEQ_SYNC_INDEX_DO env)
+            do-id (.idFromName namespace "index")
+            stub (.get namespace do-id)]
+        (.fetch stub request))
+
       (string/starts-with? path "/sync/")
       (let [prefix (count "/sync/")
             rest-path (subs path prefix)
@@ -155,6 +162,14 @@
       (and (= method "GET") (= path "/snapshot"))
       (common/json-response (snapshot-response self))
 
+      (and (= method "DELETE") (= path "/admin/reset"))
+      (do
+        (common/sql-exec (.-sql self) "drop table if exists kvs")
+        (common/sql-exec (.-sql self) "drop table if exists tx_log")
+        (common/sql-exec (.-sql self) "drop table if exists sync_meta")
+        (storage/init-schema! (.-sql self))
+        (common/json-response {:ok true}))
+
       (and (= method "POST") (= path "/tx"))
       (.then (common/read-json request)
              (fn [result]
@@ -184,3 +199,88 @@
          (if (common/upgrade-request? request)
            (handle-ws this request)
            (handle-http this request))))
+
+(defn- index-init! [sql]
+  (common/sql-exec sql
+                   (str "create table if not exists graphs ("
+                        "graph_id TEXT primary key,"
+                        "graph_name TEXT,"
+                        "schema_version TEXT,"
+                        "created_at INTEGER,"
+                        "updated_at INTEGER"
+                        ");")))
+
+(defn- index-list [sql]
+  (common/get-sql-rows
+   (common/sql-exec sql
+                    (str "select graph_id, graph_name, schema_version, created_at, updated_at "
+                         "from graphs order by updated_at desc"))))
+
+(defn- index-upsert! [sql graph-id graph-name schema-version]
+  (let [now (common/now-ms)]
+    (common/sql-exec sql
+                     (str "insert into graphs (graph_id, graph_name, schema_version, created_at, updated_at) "
+                          "values (?, ?, ?, ?, ?) "
+                          "on conflict(graph_id) do update set "
+                          "graph_name = excluded.graph_name, "
+                          "schema_version = excluded.schema_version, "
+                          "updated_at = excluded.updated_at")
+                     graph-id
+                     graph-name
+                     schema-version
+                     now
+                     now)))
+
+(defn- index-delete! [sql graph-id]
+  (common/sql-exec sql "delete from graphs where graph_id = ?" graph-id))
+
+(defn- handle-index-fetch [^js self request]
+  (let [sql (.-sql self)
+        url (js/URL. (.-url request))
+        path (.-pathname url)
+        method (.-method request)]
+    (index-init! sql)
+    (cond
+      (and (= method "GET") (= path "/graphs"))
+      (common/json-response {:graphs (index-list sql)})
+
+      (and (= method "POST") (= path "/graphs"))
+      (.then (common/read-json request)
+             (fn [result]
+               (let [graph-id (aget result "graph_id")
+                     graph-name (aget result "graph_name")
+                     schema-version (aget result "schema_version")]
+                 (if (and (string? graph-id) (string? graph-name))
+                   (do
+                     (index-upsert! sql graph-id graph-name schema-version)
+                     (common/json-response {:graph_id graph-id}))
+                   (common/bad-request "missing graph_id or graph_name")))))
+
+      (and (= method "DELETE") (string/starts-with? path "/graphs/"))
+      (let [graph-id (subs path (count "/graphs/"))]
+        (if (seq graph-id)
+          (do
+            (index-delete! sql graph-id)
+            (let [^js namespace (.-LOGSEQ_SYNC_DO (.-env self))
+                  do-id (.idFromName namespace graph-id)
+                  stub (.get namespace do-id)
+                  reset-url (str (.-origin url) "/admin/reset")]
+              (.fetch stub (js/Request. reset-url #js {:method "DELETE"})))
+            (common/json-response {:graph_id graph-id :deleted true}))
+          (common/bad-request "missing graph id")))
+
+      :else
+      (common/not-found))))
+
+(defclass SyncIndexDO
+  (extends DurableObject)
+
+  (constructor [this ^js state env]
+               (super state env)
+               (set! (.-state this) state)
+               (set! (.-env this) env)
+               (set! (.-sql this) (.-sql ^js (.-storage state))))
+
+  Object
+  (fetch [this request]
+         (handle-index-fetch this request)))

+ 7 - 0
deps/worker-sync/worker/README.md

@@ -6,11 +6,18 @@ Durable Object using SQLite storage and the Logseq datascript fork.
 ### Bindings
 
 - `LOGSEQ_SYNC_DO`: Durable Object namespace
+- `LOGSEQ_SYNC_INDEX_DO`: Durable Object namespace for graph registry
 
 ### Routes
 
 - `GET /health`
   - Returns a JSON health response
+- `GET /graphs`
+  - Returns the list of registered graphs
+- `POST /graphs`
+  - Registers or updates a graph
+- `DELETE /graphs/:graph-id`
+  - Deletes a graph and resets its DO state
 - `GET /sync/:graph-id`
   - Proxies to the Durable Object for the given graph
 

+ 5 - 1
deps/worker-sync/worker/wrangler.toml

@@ -10,6 +10,10 @@ enabled = true
 name = "LOGSEQ_SYNC_DO"
 class_name = "SyncDO"
 
+[[durable_objects.bindings]]
+name = "LOGSEQ_SYNC_INDEX_DO"
+class_name = "SyncIndexDO"
+
 [[migrations]]
 tag = "v1"
-new_sqlite_classes = ["SyncDO"]
+new_sqlite_classes = ["SyncDO", "SyncIndexDO"]

+ 1 - 1
src/main/frontend/components/repo.cljs

@@ -4,8 +4,8 @@
             [frontend.config :as config]
             [frontend.context.i18n :refer [t]]
             [frontend.db :as db]
-            [frontend.handler.db-based.rtc :as rtc-handler]
             [frontend.handler.db-based.rtc-flows :as rtc-flows]
+            [frontend.handler.db-based.sync :as rtc-handler]
             [frontend.handler.graph :as graph]
             [frontend.handler.notification :as notification]
             [frontend.handler.repo :as repo-handler]

+ 1 - 1
src/main/frontend/components/rtc/indicator.cljs

@@ -6,8 +6,8 @@
             [frontend.config :as config]
             [frontend.db :as db]
             [frontend.flows :as flows]
-            [frontend.handler.db-based.rtc :as rtc-handler]
             [frontend.handler.db-based.rtc-flows :as rtc-flows]
+            [frontend.handler.db-based.sync :as rtc-handler]
             [frontend.state :as state]
             [frontend.ui :as ui]
             [frontend.util :as util]

+ 1 - 1
src/main/frontend/components/settings.cljs

@@ -13,7 +13,7 @@
             [frontend.db :as db]
             [frontend.dicts :as dicts]
             [frontend.handler.config :as config-handler]
-            [frontend.handler.db-based.rtc :as rtc-handler]
+            [frontend.handler.db-based.sync :as rtc-handler]
             [frontend.handler.db-based.vector-search-flows :as vector-search-flows]
             [frontend.handler.global-config :as global-config-handler]
             [frontend.handler.notification :as notification]

+ 9 - 0
src/main/frontend/config.cljs

@@ -54,6 +54,15 @@
 (if ENABLE-RTC-SYNC-PRODUCTION
   (def RTC-WS-URL "wss://ws.logseq.com/rtc-sync?token=%s")
   (def RTC-WS-URL "wss://ws-dev.logseq.com/rtc-sync?token=%s"))
+
+(goog-define ENABLE-WORKER-SYNC false)
+(defonce worker-sync-enabled? ENABLE-WORKER-SYNC)
+
+(goog-define WORKER-SYNC-WS-URL "wss://sync-dev.logseq.com/sync/%s")
+(defonce worker-sync-ws-url WORKER-SYNC-WS-URL)
+
+(goog-define WORKER-SYNC-HTTP-BASE "https://sync-dev.logseq.com")
+(defonce worker-sync-http-base WORKER-SYNC-HTTP-BASE)
 ;; Feature flags
 ;; =============
 

+ 1 - 1
src/main/frontend/handler/common/developer.cljs

@@ -4,7 +4,7 @@
             [datascript.impl.entity :as de]
             [frontend.db :as db]
             [frontend.format.mldoc :as mldoc]
-            [frontend.handler.db-based.rtc :as rtc-handler]
+            [frontend.handler.db-based.sync :as rtc-handler]
             [frontend.handler.notification :as notification]
             [frontend.persist-db :as persist-db]
             [frontend.state :as state]

+ 1 - 1
src/main/frontend/handler/db_based/rtc_background_tasks.cljs

@@ -4,8 +4,8 @@
             [frontend.common.missionary :as c.m]
             [frontend.config :as config]
             [frontend.db :as db]
-            [frontend.handler.db-based.rtc :as rtc-handler]
             [frontend.handler.db-based.rtc-flows :as rtc-flows]
+            [frontend.handler.db-based.sync :as rtc-handler]
             [frontend.handler.notification :as notification]
             [frontend.state :as state]
             [lambdaisland.glogi :as log]

+ 51 - 0
src/main/frontend/handler/db_based/sync.cljs

@@ -0,0 +1,51 @@
+(ns frontend.handler.db-based.sync
+  "Dispatch RTC calls between legacy RTC and worker-sync implementations."
+  (:require [frontend.config :as config]
+            [frontend.handler.db-based.rtc :as rtc-handler]
+            [frontend.handler.db-based.worker-sync :as worker-sync-handler]))
+
+(defn- worker-sync-enabled? []
+  config/worker-sync-enabled?)
+
+(defn <rtc-create-graph! [repo]
+  (if (worker-sync-enabled?)
+    (worker-sync-handler/<rtc-create-graph! repo)
+    (rtc-handler/<rtc-create-graph! repo)))
+
+(defn <rtc-delete-graph! [graph-uuid schema-version]
+  (if (worker-sync-enabled?)
+    (worker-sync-handler/<rtc-delete-graph! graph-uuid schema-version)
+    (rtc-handler/<rtc-delete-graph! graph-uuid schema-version)))
+
+(defn <rtc-download-graph! [graph-name graph-uuid graph-schema-version timeout-ms]
+  (rtc-handler/<rtc-download-graph! graph-name graph-uuid graph-schema-version timeout-ms))
+
+(defn <rtc-stop! []
+  (if (worker-sync-enabled?)
+    (worker-sync-handler/<rtc-stop!)
+    (rtc-handler/<rtc-stop!)))
+
+(defn <rtc-branch-graph! [repo]
+  (rtc-handler/<rtc-branch-graph! repo))
+
+(defn notification-download-higher-schema-graph! [graph-name graph-uuid schema-version]
+  (rtc-handler/notification-download-higher-schema-graph! graph-name graph-uuid schema-version))
+
+(defn <rtc-get-users-info []
+  (if (worker-sync-enabled?)
+    (worker-sync-handler/<rtc-get-users-info)
+    (rtc-handler/<rtc-get-users-info)))
+
+(defn <rtc-start!
+  [repo & {:keys [stop-before-start?] :or {stop-before-start? true}}]
+  (if (worker-sync-enabled?)
+    (worker-sync-handler/<rtc-start! repo :stop-before-start? stop-before-start?)
+    (rtc-handler/<rtc-start! repo :stop-before-start? stop-before-start?)))
+
+(defn <get-remote-graphs []
+  (if (worker-sync-enabled?)
+    (worker-sync-handler/<get-remote-graphs)
+    (rtc-handler/<get-remote-graphs)))
+
+(defn <rtc-invite-email [graph-uuid email]
+  (rtc-handler/<rtc-invite-email graph-uuid email))

+ 113 - 0
src/main/frontend/handler/db_based/worker_sync.cljs

@@ -0,0 +1,113 @@
+(ns frontend.handler.db-based.worker-sync
+  "Worker-sync handler based on Cloudflare Durable Objects."
+  (:require [clojure.string :as string]
+            [frontend.config :as config]
+            [frontend.db :as db]
+            [frontend.handler.repo :as repo-handler]
+            [frontend.state :as state]
+            [lambdaisland.glogi :as log]
+            [logseq.db :as ldb]
+            [promesa.core :as p]))
+
+(defn- ws->http-base [ws-url]
+  (when (string? ws-url)
+    (let [base (cond
+                 (string/starts-with? ws-url "wss://")
+                 (str "https://" (subs ws-url (count "wss://")))
+
+                 (string/starts-with? ws-url "ws://")
+                 (str "http://" (subs ws-url (count "ws://")))
+
+                 :else ws-url)
+          base (string/replace base #"/sync/%s$" "")]
+      base)))
+
+(defn- http-base []
+  (or config/worker-sync-http-base
+      (ws->http-base config/worker-sync-ws-url)))
+
+(defn- get-graph-id [repo]
+  (let [db (db/get-db repo)]
+    (or (ldb/get-graph-rtc-uuid db)
+        ;; FIXME: only for testing
+        (random-uuid))))
+
+(defn- fetch-json
+  [url opts]
+  (p/let [resp (js/fetch url (clj->js opts))
+          text (.text resp)
+          data (when (seq text) (js/JSON.parse text))]
+    (if (.-ok resp)
+      data
+      (throw (ex-info "worker-sync request failed"
+                      {:status (.-status resp)
+                       :url url
+                       :body data})))))
+
+(defn <rtc-start!
+  [repo & {:keys [_stop-before-start?] :as _opts}]
+  (log/info :worker-sync/start {:repo repo})
+  (state/<invoke-db-worker :thread-api/worker-sync-start repo))
+
+(defn <rtc-stop!
+  []
+  (log/info :worker-sync/stop true)
+  (state/<invoke-db-worker :thread-api/worker-sync-stop))
+
+(defn <rtc-get-users-info
+  []
+  (p/resolved nil))
+
+(defn <rtc-create-graph!
+  [repo]
+  (let [graph-id (get-graph-id repo)
+        schema-version (some-> (ldb/get-graph-schema-version (db/get-db)) :major str)
+        base (http-base)]
+    (if (and graph-id base)
+      (p/let [result (fetch-json (str base "/graphs")
+                                 {:method "POST"
+                                  :headers {"content-type" "application/json"}
+                                  :body (js/JSON.stringify
+                                         #js {:graph_id (str graph-id)
+                                              :graph_name repo
+                                              :schema_version schema-version})})]
+        (ldb/transact! repo [{:logseq.kv/graph-uuid graph-id}])
+        result)
+      (p/rejected (ex-info "worker-sync missing graph info"
+                           {:type :worker-sync/invalid-graph
+                            :graph-id graph-id
+                            :base base})))))
+
+(defn <rtc-delete-graph!
+  [graph-uuid _schema-version]
+  (let [base (http-base)]
+    (if (and graph-uuid base)
+      (fetch-json (str base "/graphs/" graph-uuid) {:method "DELETE"})
+      (p/rejected (ex-info "worker-sync missing graph id"
+                           {:type :worker-sync/invalid-graph
+                            :graph-uuid graph-uuid
+                            :base base})))))
+
+(defn <get-remote-graphs
+  []
+  (let [base (http-base)]
+    (if-not base
+      (p/resolved [])
+      (-> (p/let [_ (state/set-state! :rtc/loading-graphs? true)
+                  resp (fetch-json (str base "/graphs") {:method "GET"})
+                  graphs (js->clj (aget resp "graphs") :keywordize-keys true)
+                  result (mapv (fn [graph]
+                                 (merge
+                                  {:url (str config/db-version-prefix (:graph_name graph))
+                                   :GraphName (:graph_name graph)
+                                   :GraphSchemaVersion (:schema_version graph)
+                                   :GraphUUID (:graph_id graph)
+                                   :rtc-graph? true}
+                                  (dissoc graph :graph_id :graph_name :schema_version)))
+                               graphs)]
+            (state/set-state! :rtc/graphs result)
+            (repo-handler/refresh-repos!)
+            result)
+          (p/finally
+            (fn []
+              (state/set-state! :rtc/loading-graphs? false)))))))

+ 1 - 1
src/main/frontend/handler/events.cljs

@@ -20,8 +20,8 @@
             [frontend.handler.code :as code-handler]
             [frontend.handler.common.page :as page-common-handler]
             [frontend.handler.db-based.property :as db-property-handler]
-            [frontend.handler.db-based.rtc :as rtc-handler]
             [frontend.handler.db-based.rtc-flows :as rtc-flows]
+            [frontend.handler.db-based.sync :as rtc-handler]
             [frontend.handler.editor :as editor-handler]
             [frontend.handler.export :as export]
             [frontend.handler.graph :as graph-handler]

+ 1 - 1
src/main/frontend/handler/events/ui.cljs

@@ -18,7 +18,7 @@
             [frontend.config :as config]
             [frontend.db :as db]
             [frontend.extensions.fsrs :as fsrs]
-            [frontend.handler.db-based.rtc :as rtc-handler]
+            [frontend.handler.db-based.sync :as rtc-handler]
             [frontend.handler.editor :as editor-handler]
             [frontend.handler.events :as events]
             [frontend.handler.notification :as notification]

+ 3 - 0
src/main/frontend/persist_db/browser.cljs

@@ -145,6 +145,9 @@
        (worker-handler/handle-message! worker wrapped-worker)
        (reset! state/*db-worker wrapped-worker)
        (-> (p/let [_ (state/<invoke-db-worker :thread-api/init config/RTC-WS-URL)
+                   _ (state/<invoke-db-worker :thread-api/set-worker-sync-config
+                                              {:enabled? config/worker-sync-enabled?
+                                               :ws-url config/worker-sync-ws-url})
                    _ (sync-app-state!)
                    _ (log/info "init worker spent" (str (- (util/time-ms) t1) "ms"))
                    _ (sync-ui-state!)

+ 5 - 0
src/main/frontend/worker/db_listener.cljs

@@ -8,6 +8,7 @@
             [frontend.worker.search :as search]
             [frontend.worker.shared-service :as shared-service]
             [frontend.worker.state :as worker-state]
+            [frontend.worker.worker-sync :as worker-sync]
             [logseq.common.util :as common-util]
             [logseq.db :as ldb]
             [logseq.outliner.batch-tx :as batch-tx]
@@ -50,6 +51,10 @@
     (prn :tx-data tx-data)
     (prn :tx-meta tx-meta)))
 
+(defmethod listen-db-changes :worker-sync
+  [_ {:keys [repo]} {:keys [tx-data tx-meta]}]
+  (worker-sync/handle-local-tx! repo tx-data tx-meta))
+
 (defn- remove-old-embeddings-and-reset-new-updates!
   [conn tx-data tx-meta]
   (let [;; Remove old :logseq.property.embedding/hnsw-label-updated-at when importing a graph

+ 14 - 0
src/main/frontend/worker/db_worker.cljs

@@ -34,6 +34,7 @@
             [frontend.worker.shared-service :as shared-service]
             [frontend.worker.state :as worker-state]
             [frontend.worker.thread-atom]
+            [frontend.worker.worker-sync :as worker-sync]
             [goog.object :as gobj]
             [lambdaisland.glogi :as log]
             [lambdaisland.glogi.console :as glogi-console]
@@ -377,6 +378,19 @@
   (reset! worker-state/*rtc-ws-url rtc-ws-url)
   (init-sqlite-module!))
 
+(def-thread-api :thread-api/set-worker-sync-config
+  [config]
+  (reset! worker-state/*worker-sync-config config)
+  nil)
+
+(def-thread-api :thread-api/worker-sync-start
+  [repo]
+  (worker-sync/start! repo))
+
+(def-thread-api :thread-api/worker-sync-stop
+  []
+  (worker-sync/stop!))
+
 (def-thread-api :thread-api/set-infer-worker-proxy
   [infer-worker-proxy]
   (reset! worker-state/*infer-worker infer-worker-proxy)

+ 2 - 0
src/main/frontend/worker/state.cljs

@@ -46,6 +46,8 @@
                        :thread-atom/online-event (atom nil)}))
 
 (defonce *rtc-ws-url (atom nil))
+(defonce *worker-sync-config (atom {:enabled? false :ws-url nil}))
+(defonce *worker-sync-clients (atom {}))
 
 (defonce *sqlite (atom nil))
 ;; repo -> {:db conn :search conn :client-ops conn :debug-log conn}

+ 220 - 0
src/main/frontend/worker/worker_sync.cljs

@@ -0,0 +1,220 @@
+(ns frontend.worker.worker-sync
+  "Simple worker-sync client based on promesa + WebSocket."
+  (:require [cljs.reader :as reader]
+            [clojure.string :as string]
+            [datascript.core :as d]
+            [datascript.impl.entity :as de :refer [Entity]]
+            [frontend.worker.state :as worker-state]
+            [lambdaisland.glogi :as log]
+            [logseq.db :as ldb]
+            [logseq.db.sqlite.util :as sqlite-util]
+            [promesa.core :as p]))
+
+(defn- enabled?
+  []
+  (true? (:enabled? @worker-state/*worker-sync-config)))
+
+(defn- ws-base-url
+  []
+  (:ws-url @worker-state/*worker-sync-config))
+
+(defn- format-ws-url [base graph-id]
+  (cond
+    (string/includes? base "%s")
+    (string/replace base "%s" graph-id)
+
+    (string/ends-with? base "/")
+    (str base graph-id)
+
+    :else
+    (str base "/" graph-id)))
+
+(defn- get-graph-id [repo]
+  (when-let [conn (worker-state/get-datascript-conn repo)]
+    (let [db @conn
+          graph-uuid (ldb/get-graph-rtc-uuid db)
+          local-uuid (ldb/get-graph-local-uuid db)]
+      (or (some-> graph-uuid str)
+          (some-> local-uuid str)
+          (when (string? repo) repo)))))
+
+(defn- ready-state [ws]
+  (.-readyState ws))
+
+(defn- ws-open? [ws]
+  (= 1 (ready-state ws)))
+
+(defn- send! [ws message]
+  (when (ws-open? ws)
+    (.send ws (js/JSON.stringify (clj->js message)))))
+
+(defn- parse-message [raw]
+  (try
+    (js->clj (js/JSON.parse raw) :keywordize-keys true)
+    (catch :default _
+      nil)))
+
+(defn- update-server-t! [client t]
+  (when (number? t)
+    (reset! (:server-t client) t)))
+
+(defn- apply-remote-tx! [repo tx-data]
+  (when-let [conn (worker-state/get-datascript-conn repo)]
+    (try
+      (d/transact! conn tx-data {:worker-sync/remote? true})
+      (catch :default e
+        (log/error :worker-sync/apply-remote-tx-failed {:error e})))))
+
+(defn- reconcile-cycle! [repo attr server-values]
+  (when-let [conn (worker-state/get-datascript-conn repo)]
+    (let [db @conn
+          tx-data (reduce
+                   (fn [acc [entity-str value]]
+                     (let [entity (reader/read-string entity-str)
+                           eid (d/entid db entity)
+                           current-raw (when eid (get (d/entity db eid) attr))
+                           current (cond
+                                     (and (= attr :block/parent) (instance? Entity current-raw))
+                                     (when-let [parent-uuid (:block/uuid current-raw)]
+                                       [:block/uuid parent-uuid])
+                                     (and (= attr :logseq.property.class/extends) (instance? Entity current-raw))
+                                     (:db/ident current-raw)
+                                     :else current-raw)]
+                       (cond
+                         (nil? eid) acc
+                         (nil? value)
+                         (cond
+                           (and current (sequential? current))
+                           (conj acc [:db/retract eid attr current])
+
+                           (some? current)
+                           (conj acc [:db/retract eid attr current])
+
+                           :else acc)
+
+                         :else
+                         (conj acc [:db/add eid attr value]))))
+                   []
+                   server-values)]
+      (when (seq tx-data)
+        (d/transact! conn tx-data {:worker-sync/remote? true})))))
+
+(defn- handle-message! [repo client raw]
+  (when-let [message (parse-message raw)]
+    (case (:type message)
+      "hello" (update-server-t! client (:t message))
+      "tx/ok" (update-server-t! client (:t message))
+      "tx/reject" (do
+                    (when (= "stale" (:reason message))
+                      (update-server-t! client (:t message)))
+                    (when (= "cycle" (:reason message))
+                      (let [attr (keyword (:attr message))
+                            server-values (sqlite-util/read-transit-str (:server_values message))]
+                        (reconcile-cycle! repo attr server-values))))
+      "pull/ok" (do
+                  (update-server-t! client (:t message))
+                  (doseq [{:keys [tx]} (:txs message)]
+                    (when tx
+                      (apply-remote-tx! repo (sqlite-util/read-transit-str tx)))))
+      "snapshot/ok" (update-server-t! client (:t message))
+      nil)))
+
+(defn- ensure-client-state! [repo]
+  (or (get @worker-state/*worker-sync-clients repo)
+      (let [client {:repo repo
+                    :server-t (atom 0)
+                    :send-queue (atom (p/resolved nil))}]
+        (swap! worker-state/*worker-sync-clients assoc repo client)
+        client)))
+
+(defn- attach-ws-handlers! [repo client ws]
+  (set! (.-onmessage ws)
+        (fn [event]
+          (handle-message! repo client (.-data event))))
+  (set! (.-onclose ws)
+        (fn [_]
+          (log/info :worker-sync/ws-closed {:repo repo}))))
+
+(defn- start-pull-loop! [client ws]
+  (let [interval-id (js/setInterval
+                     (fn []
+                       (when (ws-open? ws)
+                         (send! ws {:type "pull" :since @(:server-t client)})))
+                     2000)]
+    (assoc client :pull-interval-id interval-id)))
+
+(defn- stop-client! [client]
+  (when-let [interval-id (:pull-interval-id client)]
+    (js/clearInterval interval-id))
+  (when-let [ws (:ws client)]
+    (try
+      (.close ws)
+      (catch :default _
+        nil))))
+
+(defn- connect! [repo client url]
+  (let [ws (js/WebSocket. url)
+        updated (assoc client :ws ws)]
+    (attach-ws-handlers! repo updated ws)
+    (set! (.-onopen ws)
+          (fn [_]
+            (send! ws {:type "hello" :client repo})))
+    (start-pull-loop! updated ws)))
+
+(defn start!
+  [repo]
+  (if-not (enabled?)
+    (p/resolved nil)
+    (let [base (ws-base-url)
+          graph-id (get-graph-id repo)]
+      (if (and (string? base) (seq base) (seq graph-id))
+        (let [client (ensure-client-state! repo)
+              url (format-ws-url base graph-id)
+              connected (connect! repo client url)]
+          (swap! worker-state/*worker-sync-clients assoc repo connected)
+          (p/resolved nil))
+        (do
+          (log/info :worker-sync/start-skipped {:repo repo :graph-id graph-id :base base})
+          (p/resolved nil))))))
+
+(defn stop!
+  ([] (doseq [[repo client] @worker-state/*worker-sync-clients]
+        (stop-client! client)
+        (swap! worker-state/*worker-sync-clients dissoc repo))
+      (p/resolved nil))
+  ([repo]
+   (when-let [client (get @worker-state/*worker-sync-clients repo)]
+     (stop-client! client)
+     (swap! worker-state/*worker-sync-clients dissoc repo))
+   (p/resolved nil)))
+
+(defn enqueue-local-tx!
+  [repo tx-data]
+  (when-let [client (get @worker-state/*worker-sync-clients repo)]
+    (let [send-queue (:send-queue client)
+          normalized (mapv (fn [item]
+                             (if (and (map? item) (contains? item :e) (contains? item :a))
+                               (if (:added item)
+                                 [:db/add (:e item) (:a item) (:v item)]
+                                 [:db/retract (:e item) (:a item) (:v item)])
+                               item))
+                           tx-data)
+          tx-str (sqlite-util/write-transit-str normalized)]
+      (swap! send-queue
+             (fn [prev]
+               (p/then prev
+                       (fn [_]
+                         (when-let [ws (:ws (get @worker-state/*worker-sync-clients repo))]
+                           (when (ws-open? ws)
+                             (send! ws {:type "tx"
+                                        :t_before @(:server-t client)
+                                        :tx tx-str}))))))))))
+
+(defn handle-local-tx!
+  [repo tx-data tx-meta]
+  (when (and (enabled?)
+             (seq tx-data)
+             (not (:worker-sync/remote? tx-meta))
+             (not (:rtc-download-graph? tx-meta))
+             (not (:from-disk? tx-meta)))
+    (enqueue-local-tx! repo tx-data)))