Просмотр исходного кода

use cloudflare d1 instead of do for graph metadata

Tienson Qin 1 месяц назад
Родитель
Сommit
f0efeb31bc

+ 16 - 0
deps/db-sync/src/logseq/db_sync/common.cljs

@@ -56,6 +56,22 @@
   [sql sql-str & args]
   (.apply (.-exec sql) sql (to-array (cons sql-str args))))
 
+(defn <d1-run
+  [^js db sql-str & args]
+  (p/let [^js stmt (.prepare db sql-str)
+          stmt (if (seq args)
+                 (.apply (.-bind stmt) stmt (to-array args))
+                 stmt)]
+    (.run stmt)))
+
+(defn <d1-all
+  [^js db sql-str & args]
+  (p/let [^js stmt (.prepare db sql-str)
+          stmt (if (seq args)
+                 (.apply (.-bind stmt) stmt (to-array args))
+                 stmt)]
+    (.all stmt)))
+
 (defn read-json [request]
   (p/let [body (.text request)]
     (when (seq body)

+ 133 - 124
deps/db-sync/src/logseq/db_sync/worker.cljs

@@ -10,7 +10,6 @@
             [logseq.db-sync.malli-schema :as db-sync-schema]
             [logseq.db-sync.protocol :as protocol]
             [logseq.db-sync.storage :as storage]
-            [logseq.db.common.normalize :as db-normalize]
             [promesa.core :as p]
             [shadow.cljs.modern :refer (defclass)]))
 
@@ -525,27 +524,31 @@
   (webSocketError [_this _ws error]
                   (log/error :db-sync/ws-error {:error error})))
 
-(defn- index-init! [sql]
-  (common/sql-exec sql
+(defn- index-db [^js self]
+  (let [db (.-d1 self)]
+    (when-not db
+      (log/error :db-sync/index-db-missing {:binding "DB"}))
+    db))
+
+(defn- <index-init! [db]
+  (p/do!
+   (common/<d1-run db
                    (str "create table if not exists graphs ("
                         "graph_id TEXT primary key,"
                         "graph_name TEXT,"
-                        "owner_sub TEXT,"
+                        "user_id TEXT,"
                         "schema_version TEXT,"
                         "created_at INTEGER,"
                         "updated_at INTEGER"
-                        ");"))
-  (try
-    (common/sql-exec sql "alter table graphs add column owner_sub TEXT")
-    (catch :default _ nil)))
-
-(defn- index-list [sql owner-sub]
-  (if (string? owner-sub)
-    (let [rows (common/get-sql-rows
-                (common/sql-exec sql
-                                 (str "select graph_id, graph_name, schema_version, created_at, updated_at "
-                                      "from graphs where owner_sub = ? order by updated_at desc")
-                                 owner-sub))]
+                        ");"))))
+
+(defn- <index-list [db user-id]
+  (if (string? user-id)
+    (p/let [result (common/<d1-all db
+                                   (str "select graph_id, graph_name, schema_version, created_at, updated_at "
+                                        "from graphs where user_id = ? order by updated_at desc")
+                                   user-id)
+            rows (common/get-sql-rows result)]
       (mapv (fn [row]
               {:graph_id (aget row "graph_id")
                :graph_name (aget row "graph_name")
@@ -555,35 +558,36 @@
             rows))
     []))
 
-(defn- index-upsert! [sql graph-id graph-name owner-sub schema-version]
-  (let [now (common/now-ms)]
-    (common/sql-exec sql
-                     (str "insert into graphs (graph_id, graph_name, owner_sub, schema_version, created_at, updated_at) "
-                          "values (?, ?, ?, ?, ?, ?) "
-                          "on conflict(graph_id) do update set "
-                          "graph_name = excluded.graph_name, "
-                          "owner_sub = excluded.owner_sub, "
-                          "schema_version = excluded.schema_version, "
-                          "updated_at = excluded.updated_at")
-                     graph-id
-                     graph-name
-                     owner-sub
-                     schema-version
-                     now
-                     now)))
-
-(defn- index-delete! [sql graph-id]
-  (common/sql-exec sql "delete from graphs where graph_id = ?" graph-id))
-
-(defn- index-owns-graph? [sql graph-id owner-sub]
-  (when (and (string? graph-id) (string? owner-sub))
-    (let [rows (common/get-sql-rows
-                (common/sql-exec sql
-                                 (str "select graph_id from graphs "
-                                      "where graph_id = ? and owner_sub = ?")
+(defn- <index-upsert! [db graph-id graph-name user-id schema-version]
+  (p/let [now (common/now-ms)
+          result (common/<d1-run db
+                                 (str "insert into graphs (graph_id, graph_name, user_id, schema_version, created_at, updated_at) "
+                                      "values (?, ?, ?, ?, ?, ?) "
+                                      "on conflict(graph_id) do update set "
+                                      "graph_name = excluded.graph_name, "
+                                      "user_id = excluded.user_id, "
+                                      "schema_version = excluded.schema_version, "
+                                      "updated_at = excluded.updated_at")
                                  graph-id
-                                 owner-sub))]
-      (seq rows))))
+                                 graph-name
+                                 user-id
+                                 schema-version
+                                 now
+                                 now)]
+    result))
+
+(defn- <index-delete! [db graph-id]
+  (common/<d1-run db "delete from graphs where graph_id = ?" graph-id))
+
+(defn- <user-has-access-to-graph? [db graph-id user-id]
+  (when (and (string? graph-id) (string? user-id))
+    (p/let [result (common/<d1-all db
+                                   (str "select graph_id from graphs "
+                                        "where graph_id = ? and user_id = ?")
+                                   graph-id
+                                   user-id)
+            rows (common/get-sql-rows result)]
+      (boolean (seq rows)))))
 
 (defn- graph-path-parts [path]
   (->> (string/split path #"/")
@@ -591,92 +595,97 @@
        (vec)))
 
 (defn- handle-index-fetch [^js self request]
-  (let [sql (.-sql self)
+  (let [db (index-db self)
         env (.-env self)
         url (js/URL. (.-url request))
         path (.-pathname url)
         method (.-method request)
         parts (graph-path-parts path)]
     (try
-      (if (contains? #{"OPTIONS" "HEAD"} method)
+      (cond
+        (contains? #{"OPTIONS" "HEAD"} method)
         (common/options-response)
-        (do
-          (index-init! sql)
-          (p/let [claims (auth-claims request env)]
-            (cond
-              (nil? claims)
-              (unauthorized)
-
-              (and (= method "GET") (= ["graphs"] parts))
-              (let [owner-sub (aget claims "sub")]
-                (if (string? owner-sub)
-                  (json-response :graphs/list {:graphs (index-list sql owner-sub)})
-                  (unauthorized)))
-
-              (and (= method "POST") (= ["graphs"] parts))
-              (.then (common/read-json request)
-                     (fn [result]
-                       (if (nil? result)
-                         (bad-request "missing body")
-                         (let [body (js->clj result :keywordize-keys true)
-                               body (coerce-http-request :graphs/create body)
-                               graph-id (str (random-uuid))
-                               owner-sub (aget claims "sub")]
-                           (cond
-                             (not (string? owner-sub))
-                             (unauthorized)
-
-                             (nil? body)
-                             (bad-request "invalid body")
-
-                             :else
-                             (let [{:keys [graph_name schema_version]} body]
-                               (index-upsert! sql graph-id graph_name owner-sub schema_version)
-                               (json-response :graphs/create {:graph_id graph-id})))))))
-
-              (and (= method "GET")
-                   (= 3 (count parts))
-                   (= "graphs" (first parts))
-                   (= "access" (nth parts 2)))
-              (let [graph-id (nth parts 1)
-                    owner-sub (aget claims "sub")]
-                (cond
-                  (not (string? owner-sub))
-                  (unauthorized)
-
-                  (index-owns-graph? sql graph-id owner-sub)
-                  (json-response :graphs/access {:ok true})
-
-                  :else
-                  (forbidden)))
-
-              (and (= method "DELETE")
-                   (= 2 (count parts))
-                   (= "graphs" (first parts)))
-              (let [graph-id (nth parts 1)
-                    owner-sub (aget claims "sub")]
-                (cond
-                  (not (seq graph-id))
-                  (bad-request "missing graph id")
-
-                  (not (string? owner-sub))
-                  (unauthorized)
-
-                  (not (index-owns-graph? sql graph-id owner-sub))
-                  (forbidden)
-
-                  :else
-                  (do
-                    (index-delete! sql graph-id)
-                    (let [^js namespace (.-LOGSEQ_SYNC_DO (.-env self))
-                          do-id (.idFromName namespace graph-id)
-                          stub (.get namespace do-id)
-                          reset-url (str (.-origin url) "/admin/reset")]
-                      (.fetch stub (js/Request. reset-url #js {:method "DELETE"})))
-                    (json-response :graphs/delete {:graph_id graph-id :deleted true}))))
-
-              :else
-              (not-found)))))
+
+        (nil? db)
+        (error-response "server error" 500)
+
+        :else
+        (p/let [_ (<index-init! db)
+                claims (auth-claims request env)]
+          (cond
+            (nil? claims)
+            (unauthorized)
+
+            (and (= method "GET") (= ["graphs"] parts))
+            (let [user-id (aget claims "sub")]
+              (if (string? user-id)
+                (p/let [graphs (<index-list db user-id)]
+                  (json-response :graphs/list {:graphs graphs}))
+                (unauthorized)))
+
+            (and (= method "POST") (= ["graphs"] parts))
+            (.then (common/read-json request)
+                   (fn [result]
+                     (if (nil? result)
+                       (bad-request "missing body")
+                       (let [body (js->clj result :keywordize-keys true)
+                             body (coerce-http-request :graphs/create body)
+                             graph-id (str (random-uuid))
+                             user-id (aget claims "sub")]
+                         (cond
+                           (not (string? user-id))
+                           (unauthorized)
+
+                           (nil? body)
+                           (bad-request "invalid body")
+
+                           :else
+                           (p/let [{:keys [graph_name schema_version]} body
+                                   _ (<index-upsert! db graph-id graph_name user-id schema_version)]
+                             (json-response :graphs/create {:graph_id graph-id})))))))
+
+            (and (= method "GET")
+                 (= 3 (count parts))
+                 (= "graphs" (first parts))
+                 (= "access" (nth parts 2)))
+            (let [graph-id (nth parts 1)
+                  user-id (aget claims "sub")]
+              (cond
+                (not (string? user-id))
+                (unauthorized)
+
+                :else
+                (p/let [owns? (<user-has-access-to-graph? db graph-id user-id)]
+                  (if owns?
+                    (json-response :graphs/access {:ok true})
+                    (forbidden)))))
+
+            (and (= method "DELETE")
+                 (= 2 (count parts))
+                 (= "graphs" (first parts)))
+            (let [graph-id (nth parts 1)
+                  user-id (aget claims "sub")]
+              (cond
+                (not (seq graph-id))
+                (bad-request "missing graph id")
+
+                (not (string? user-id))
+                (unauthorized)
+
+                :else
+                (p/let [owns? (<user-has-access-to-graph? db graph-id user-id)]
+                  (if (not owns?)
+                    (forbidden)
+                    (p/let [_ (<index-delete! db graph-id)]
+                      (let [^js namespace (.-LOGSEQ_SYNC_DO (.-env self))
+                            do-id (.idFromName namespace graph-id)
+                            stub (.get namespace do-id)
+                            reset-url (str (.-origin url) "/admin/reset")]
+                        (.fetch stub (js/Request. reset-url #js {:method "DELETE"})))
+                      (json-response :graphs/delete {:graph_id graph-id :deleted true}))))))
+
+            :else
+            (not-found))))
       (catch :default error
         (log/error :db-sync/index-error error)
         (error-response "server error" 500)))))
@@ -688,7 +697,7 @@
                (super state env)
                (set! (.-state this) state)
                (set! (.-env this) env)
-               (set! (.-sql this) (.-sql ^js (.-storage state))))
+               (set! (.-d1 this) (aget env "DB")))
 
   Object
   (fetch [this request]

+ 10 - 0
deps/db-sync/worker/wrangler.toml

@@ -18,6 +18,11 @@ class_name = "SyncIndexDO"
 binding = "LOGSEQ_SYNC_ASSETS"
 bucket_name = "logseq-sync-assets-dev"
 
+[[d1_databases]]
+binding = "DB"
+database_name = "logseq-sync-graph-meta-dev"
+database_id = "c020574a-5623-407b-be0c-cd192bab9545"
+
 [[migrations]]
 tag = "v1"
 new_sqlite_classes = [ "SyncDO", "SyncIndexDO" ]
@@ -50,3 +55,8 @@ new_sqlite_classes = [ "SyncDO", "SyncIndexDO" ]
 [[env.staging.r2_buckets]]
 binding = "LOGSEQ_SYNC_ASSETS"
 bucket_name = "logseq-sync-assets-dev"
+
+[[env.staging.d1_databases]]
+binding = "DB"
+database_name = "logseq-sync-graph-meta-staging"
+database_id = "REPLACE_WITH_D1_DATABASE_ID"