|
|
@@ -97,6 +97,24 @@
|
|
|
created-at
|
|
|
(outliner-op->sql outliner-op)))
|
|
|
|
|
|
+(defn- with-sql-transaction!
|
|
|
+ [sql transaction-sync-f f]
|
|
|
+ (if (fn? transaction-sync-f)
|
|
|
+ (transaction-sync-f f)
|
|
|
+ (do
|
|
|
+ ;; Node sqlite wrapper supports SQL transaction statements directly.
|
|
|
+ ;; Durable Objects sqlite must use state.storage.transactionSync().
|
|
|
+ (common/sql-exec sql "BEGIN IMMEDIATE")
|
|
|
+ (try
|
|
|
+ (let [result (f)]
|
|
|
+ (common/sql-exec sql "COMMIT")
|
|
|
+ result)
|
|
|
+ (catch :default error
|
|
|
+ (try
|
|
|
+ (common/sql-exec sql "ROLLBACK")
|
|
|
+ (catch :default _))
|
|
|
+ (throw error))))))
|
|
|
+
|
|
|
(defn fetch-tx-since [sql since-t]
|
|
|
(let [rows (common/get-sql-rows
|
|
|
(common/sql-exec sql
|
|
|
@@ -144,29 +162,36 @@
|
|
|
(restore-data-from-addr sql addr))))
|
|
|
|
|
|
(defn- append-tx-for-tx-report
|
|
|
- [sql {:keys [db-after db-before tx-data tx-meta] :as tx-report}]
|
|
|
- (let [new-t (next-t! sql)
|
|
|
- created-at (common/now-ms)
|
|
|
- prev-checksum (get-checksum sql)
|
|
|
- checksum (sync-checksum/update-checksum prev-checksum tx-report)
|
|
|
+ [sql transaction-sync-f {:keys [db-after db-before tx-data tx-meta] :as tx-report}]
|
|
|
+ (let [created-at (common/now-ms)
|
|
|
normalized-data (->> tx-data
|
|
|
(db-normalize/normalize-tx-data db-after db-before))
|
|
|
;; _ (prn :debug :tx-data tx-data)
|
|
|
;; _ (prn :debug :normalized-data normalized-data)
|
|
|
tx-str (common/write-transit normalized-data)]
|
|
|
- (set-checksum! sql checksum)
|
|
|
- (append-tx! sql new-t tx-str created-at (:outliner-op tx-meta))))
|
|
|
+ (with-sql-transaction!
|
|
|
+ sql
|
|
|
+ transaction-sync-f
|
|
|
+ (fn []
|
|
|
+ (let [new-t (next-t! sql)
|
|
|
+ prev-checksum (get-checksum sql)
|
|
|
+ checksum (sync-checksum/update-checksum prev-checksum tx-report)]
|
|
|
+ (set-checksum! sql checksum)
|
|
|
+ (append-tx! sql new-t tx-str created-at (:outliner-op tx-meta)))))))
|
|
|
|
|
|
(defn- listen-db-updates!
|
|
|
- [sql conn]
|
|
|
+ [sql conn transaction-sync-f]
|
|
|
(d/listen! conn ::listen-db-updates
|
|
|
(fn [tx-report]
|
|
|
- (append-tx-for-tx-report sql tx-report))))
|
|
|
-
|
|
|
-(defn open-conn [sql]
|
|
|
- (init-schema! sql)
|
|
|
- (let [storage (new-sqlite-storage sql)
|
|
|
- schema db-schema/schema
|
|
|
- conn (common-sqlite/get-storage-conn storage schema)]
|
|
|
- (listen-db-updates! sql conn)
|
|
|
- conn))
|
|
|
+ (append-tx-for-tx-report sql transaction-sync-f tx-report))))
|
|
|
+
|
|
|
+(defn open-conn
|
|
|
+ ([sql]
|
|
|
+ (open-conn sql nil))
|
|
|
+ ([sql {:keys [transaction-sync-f]}]
|
|
|
+ (init-schema! sql)
|
|
|
+ (let [storage (new-sqlite-storage sql)
|
|
|
+ schema db-schema/schema
|
|
|
+ conn (common-sqlite/get-storage-conn storage schema)]
|
|
|
+ (listen-db-updates! sql conn transaction-sync-f)
|
|
|
+ conn)))
|