Ver Fonte

enhance(sync): reduce <get-local-files-meta calls

rcmerci há 2 anos atrás
pai
commit
67b6fde088

+ 1 - 1
resources/package.json

@@ -37,7 +37,7 @@
     "https-proxy-agent": "5.0.0",
     "@sentry/electron": "2.5.1",
     "posthog-js": "1.10.2",
-    "@logseq/rsapi": "0.0.54",
+    "@logseq/rsapi": "0.0.56",
     "electron-deeplink": "1.0.10",
     "abort-controller": "3.0.0"
   },

+ 182 - 119
src/main/frontend/fs/sync.cljs

@@ -543,7 +543,10 @@
    (diffs->partitioned-filetxns n)))
 
 
-(deftype FileMetadata [size etag path encrypted-path last-modified remote? txid ^:mutable normalized-path]
+(deftype FileMetadata [size etag path encrypted-path last-modified remote?
+                       txid             ;remote-only
+                       path-in-arg      ;local-only
+                       ^:mutable normalized-path]
   Object
   (get-normalized-path [_]
     (assert (string? path) path)
@@ -576,12 +579,14 @@
       :last-modified last-modified
       :remote? remote?
       :txid txid
+      :path-in-arg path-in-arg
       not-found))
 
 
   IPrintWithWriter
   (-pr-writer [_ w _opts]
-    (write-all w (str {:size size :etag etag :path path :remote? remote? :txid txid :last-modified last-modified}))))
+    (write-all w (str {:size size :etag etag :path path :path-in-arg path-in-arg
+                       :remote? remote? :txid txid :last-modified last-modified}))))
 
 
 
@@ -702,6 +707,8 @@
   (rsapi-ready? [this graph-uuid] "return true when rsapi ready")
   (<key-gen [this] "generate public+private keys")
   (<set-env [this graph-uuid prod? private-key public-key] "set environment")
+  ;; TODO: current <get-local-files-meta return #{<meta1>, <meta2>, ...},
+  ;;       modify it to return {<path-in-arg1> <meta1>, <path-in-arg2> <meta2>}
   (<get-local-files-meta [this graph-uuid base-path filepaths] "get local files' metadata")
   (<get-local-all-files-meta [this graph-uuid base-path] "get all local files' metadata")
   (<rename-local-file [this graph-uuid base-path from to])
@@ -740,32 +747,39 @@
 (defprotocol IToken
   (<get-token [this]))
 
-
-(defn <case-different-local-file-exist?
+(defn <case-different-local-files-exist?
   "e.g. filepath=\"pages/Foo.md\"
   found-filepath=\"pages/foo.md\"
   it happens on macos (case-insensitive fs)
 
-  return canonicalized filepath if exists"
-  [graph-uuid irsapi base-path filepath]
+  return {query-path -> disk-path} map, and disk-path != query-path"
+  [graph-uuid irsapi base-path filepaths]
   (go
-    (let [r (<! (<get-local-files-meta irsapi graph-uuid base-path [filepath]))]
-      (when (some-> r first :path (not= filepath))
-        (-> r first :path)))))
-
-(defn <local-file-not-exist?
-  [graph-uuid irsapi base-path filepath]
+    (let [r (<! (<get-local-files-meta irsapi graph-uuid base-path filepaths))
+          query-path->disk-path-map (mapv (juxt :path-in-arg :path) r)]
+      (into {}
+            (filter (fn [[query-path disk-path]] (not= query-path disk-path)))
+            query-path->disk-path-map))))
+
+(defn <local-files-not-exist?
+  "return {path not-exist?-boolean}"
+  [graph-uuid irsapi base-path filepaths]
   (go
-    (let [r (<! (<get-local-files-meta irsapi graph-uuid base-path [filepath]))]
-
-      (or
-       ;; not found at all
-       (empty? r)
-       ;; or,
-       ;; e.g. filepath="pages/Foo.md"
-       ;; found-filepath="pages/foo.md"
-       ;; it happens on macos (case-insensitive fs)
-       (not= filepath (:path (first r)))))))
+    (let [r (<! (<get-local-files-meta irsapi graph-uuid base-path filepaths))
+          path->meta-map (into {} (map (juxt :path-in-arg identity)) r)]
+      (into {}
+            (map
+             (fn [query-path]
+               (let [meta (get path->meta-map query-path)]
+                 [query-path (or
+                              ;; not found at all
+                              (nil? meta)
+                              ;; or, e.g.
+                              ;; query-filepath="pages/Foo.md"
+                              ;; found-filepath="pages/foo.md"
+                              ;; it happens on macos (case-insensitive fs)
+                              (not= query-path (:path meta)))])))
+            filepaths))))
 
 (defn- <retry-rsapi [f]
   (go-loop [n 3]
@@ -794,7 +808,7 @@
         (recur others
                (conj result
                      (->FileMetadata (get metadata "size") (get metadata "md5") normalized-path
-                                     encryptedFname (get metadata "mtime") false nil nil)))))))
+                                     encryptedFname (get metadata "mtime") false nil nil nil)))))))
 
 (deftype RSAPI [^:mutable graph-uuid' ^:mutable private-key' ^:mutable public-key']
   IToken
@@ -820,10 +834,23 @@
           r
           (<! (<build-local-file-metadatas this graph-uuid r))))))
   (<get-local-files-meta [this graph-uuid base-path filepaths]
+    (assert (seq filepaths) filepaths)
     (go
       (let [r (<! (<retry-rsapi #(p->c (ipc/ipc "get-local-files-meta" graph-uuid base-path filepaths))))]
         (assert (not (instance? ExceptionInfo r)) "get-local-files-meta shouldn't return exception")
-        (<! (<build-local-file-metadatas this graph-uuid r)))))
+
+        (loop [[[path-in-arg metadata] & others] (js->clj r)
+               result #{}]
+          (if-not (and path-in-arg metadata)
+            result                      ; finish
+            (let [normalized-path (path-normalize (get metadata "fname"))
+                  encryptedFname  (if (not= path normalized-path)
+                                    (first (<! (<encrypt-fnames this graph-uuid [normalized-path])))
+                                    (get metadata "encryptedFname"))]
+              (recur others
+                     (conj result
+                           (->FileMetadata (get metadata "size") (get metadata "md5") normalized-path
+                                           encryptedFname (get metadata "mtime") false nil path-in-arg nil)))))))))
   (<rename-local-file [_ graph-uuid base-path from to]
     (<retry-rsapi #(p->c (ipc/ipc "rename-local-file" graph-uuid base-path
                                   (path-normalize from)
@@ -1226,6 +1253,7 @@
                                   (:last-modified %)
                                   true
                                   (:txid %)
+                                  nil
                                   nil)
                  (filter-files-with-unnormalized-path file-meta-list* encrypted-path->path-map))))))))))
 
@@ -1252,6 +1280,7 @@
                                             (:LastModified %)
                                             true
                                             (:Txid %)
+                                            nil
                                             nil)))
                      r))))))))
 
@@ -1469,11 +1498,11 @@
           (assert (<= local-txid remote-txid)
                   [@graphs-txid local-txid remote-txid])))))
 
-(defn- get-local-files-checksum
+(defn- <get-local-files-checksum
   [graph-uuid base-path relative-paths]
   (go
     (into {}
-          (map (juxt #(.-path ^FileMetadata %) #(.-etag ^FileMetadata %)))
+          (map (fn [^FileMetadata meta] [(:path-in-arg meta) (:etag meta)]))
           (<! (<get-local-files-meta rsapi graph-uuid base-path relative-paths)))))
 
 (declare sync-state--add-current-local->remote-files
@@ -1541,19 +1570,21 @@
                                          [% (db/get-file repo (config/get-file-path repo (relative-path %)))]))
                                      (remove nil?))]
 
-        (doseq [relative-p (map relative-path filetxns)]
-          (when-some [relative-p*
-                      (<! (<case-different-local-file-exist? graph-uuid rsapi base-path relative-p))]
-            (let [recent-remote->local-file-item {:remote->local-type :delete
-                                                  :checksum nil
-                                                  :path relative-p*}]
-              (println :debug "found case-different-same-local-file" relative-p relative-p*)
-              (swap! *sync-state sync-state--add-recent-remote->local-files
-                     [recent-remote->local-file-item])
-              (<! (<delete-local-files rsapi graph-uuid base-path [relative-p*]))
+        (let [case-different-query-path->disk-path
+              (<! (<case-different-local-files-exist? graph-uuid rsapi base-path (map relative-path filetxns)))]
+          (when (seq case-different-query-path->disk-path)
+            (let [disk-paths (vals case-different-query-path->disk-path)
+                  recent-remote->local-file-items
+                  (mapv (fn [disk-path]
+                          {:remote->local-type :delete
+                           :checksum nil
+                           :path disk-path})
+                        disk-paths)]
+              (println :debug "found case-different-same-local-file" case-different-query-path->disk-path)
+              (swap! *sync-state sync-state--add-recent-remote->local-files recent-remote->local-file-items)
+              (<! (<delete-local-files rsapi graph-uuid base-path disk-paths))
               (go (<! (timeout 5000))
-                (swap! *sync-state sync-state--remove-recent-remote->local-files
-                       [recent-remote->local-file-item])))))
+                  (swap! *sync-state sync-state--remove-recent-remote->local-files recent-remote->local-file-items)))))
 
         (let [update-local-files-ch (<update-local-files rsapi graph-uuid base-path (map relative-path filetxns))
               r (<! (<with-pause update-local-files-ch *paused))]
@@ -1570,7 +1601,7 @@
       (.-deleted? (first filetxns))
       (let [filetxn (first filetxns)]
         (assert (= 1 (count filetxns)))
-        (if (<! (<local-file-not-exist? graph-uuid rsapi base-path (relative-path filetxn)))
+        (if (second (vals (<! (<local-files-not-exist? graph-uuid rsapi base-path [(relative-path filetxn)]))))
           ;; not exist, ignore
           true
           (let [r (<! (<delete-local-files rsapi graph-uuid base-path [(relative-path filetxn)]))]
@@ -1697,17 +1728,23 @@
     (write-all w (str {:type type :base-path dir :path path :size (:size stat) :checksum checksum}))))
 
 
-(defn- <file-change-event=>recent-remote->local-file-item
-  [graph-uuid ^FileChangeEvent e]
+(defn- <file-change-events=>recent-remote->local-file-items
+  [graph-uuid es]
   (go
-    (let [tp (case (.-type e)
-               ("add" "change") :update
-               "unlink" :delete)
-          path (relative-path e)]
-      {:remote->local-type tp
-       :checksum (if (= tp :delete) nil
-                                    (val (first (<! (get-local-files-checksum graph-uuid (.-dir e) [path])))))
-       :path path})))
+    (let [base-path (.-dir (first es))
+          paths (mapv relative-path (filterv (fn [e] (not= "unlink" (.-type e))) es))
+          path->checksum-map (when (seq paths) (<! (<get-local-files-checksum graph-uuid base-path paths)))]
+      (into {}
+            (for [^FileChangeEvent e es]
+              (let [tp (case (.-type e)
+                         ("add" "change") :update
+                         "unlink" :delete)
+                    path (relative-path e)]
+                [{:remote->local-type tp
+                  :checksum (if (= tp :delete)
+                              nil
+                              (get path->checksum-map path))
+                  :path path} e]))))))
 
 (defn- distinct-file-change-events-xf
   "transducer.
@@ -2122,19 +2159,20 @@
   (update sync-state :current-local->remote-files into paths))
 
 (defn sync-state--add-queued-local->remote-files
-  [sync-state event]
+  [sync-state events]
   {:post [(s/valid? ::sync-state %)]}
   (update sync-state :queued-local->remote-files
-          (fn [o event]
-            (->> (concat o [event])
-                 (util/distinct-by-last-wins (fn [e] (.-path e))))) event))
+          (fn [o events]
+            (->> (concat o events)
+                 (util/distinct-by-last-wins (fn [e] (.-path e))))) events))
 
 (defn sync-state--remove-queued-local->remote-files
-  [sync-state event]
+  [sync-state events]
   {:post [(s/valid? ::sync-state %)]}
-  (update sync-state :queued-local->remote-files
-          (fn [o event]
-            (remove #{event} o)) event))
+  (let [events (set events)]
+    (update sync-state :queued-local->remote-files
+            (fn [o events]
+              (remove events o)) events)))
 
 (defn sync-state-reset-queued-local->remote-files
   [sync-state]
@@ -2331,6 +2369,7 @@
                                    filtered-files)
                          latest-txid)))))))))))
 
+;;; TODO:  <file-changed? -> <files-changed?, support batch-get
 (defn- <file-changed?
   "return true when file changed compared with remote"
   [graph-uuid file-path-without-base-path base-path]
@@ -2347,20 +2386,31 @@
   - for 'unlink' event
     - when related file exists on local dir, ignore this event
   - for 'add' | 'change' event
-    - when related file's content is same as remote file, ignore it"
-  [^FileChangeEvent e basepath graph-uuid]
+    - when related file's content is same as remote file, ignore it
+
+  return `FileChangeEvent` coll"
+  [es basepath graph-uuid]
   (go
-    (let [r-path (relative-path e)]
-      (case (.-type e)
-        "unlink"
-        ;; keep this e when it's not found
-        (<! (<local-file-not-exist? graph-uuid rsapi basepath r-path))
-
-        ("add" "change")
-        ;; 1. local file exists
-        ;; 2. compare with remote file, and changed
-        (and (not (<! (<local-file-not-exist? graph-uuid rsapi basepath r-path)))
-             (<! (<file-changed? graph-uuid r-path basepath)))))))
+    (when (seq es)
+      (let [path->e-map (into {} (map (juxt relative-path identity)) es)
+            {add-change-path->e false unlink-path->e true}
+            (group-by (fn [[_ e]] (= "unlink" (:type e))) path->e-map)
+            path->not-exist?-map
+            (<! (<local-files-not-exist? graph-uuid rsapi basepath (keys path->e-map)))
+            add-change-path->e* (filterv (fn [[path _]]
+                                           (false? (get path->not-exist?-map path)))
+                                         add-change-path->e)
+            add-change-path->remote-diff?-map
+            (loop [[[path e] & others] add-change-path->e*
+                   result {}]
+              (if-not (and path e)
+                result                  ;finish
+                (recur others (conj result [path (<! (<file-changed? graph-uuid path basepath))]))))
+            keep-unlink-es
+            (mapv second (filterv (fn [[path _]] (true? (get path->not-exist?-map path))) unlink-path->e))
+            keep-add-change-paths
+            (mapv first (filterv (fn [[_ v]] (true? v)) add-change-path->remote-diff?-map))]
+        (remove nil? (concat keep-unlink-es (mapv #(get path->e-map %) keep-add-change-paths)))))))
 
 (defn- <filter-checksum-not-consistent
   "filter out FileChangeEvents checksum changed,
@@ -2456,26 +2506,33 @@
          local-files-meta-map))))
 
 (defrecord ^:large-vars/cleanup-todo
-  Local->RemoteSyncer [user-uuid graph-uuid base-path repo *sync-state remoteapi
-                       ^:mutable rate *txid ^:mutable remote->local-syncer stop-chan *stopped *paused
+ Local->RemoteSyncer [user-uuid graph-uuid base-path repo *sync-state remoteapi
+                      ^:mutable rate *txid ^:mutable remote->local-syncer stop-chan *stopped *paused
                        ;; control chans
-                       private-immediately-local->remote-chan private-recent-edited-chan]
+                      private-immediately-local->remote-chan private-recent-edited-chan]
   Object
   (filter-file-change-events-fn [_]
-    (fn [^FileChangeEvent e]
-      (go (and (instance? FileChangeEvent e)
-               (if-let [mtime (:mtime (.-stat e))]
-                 ;; if mtime is not nil, it should be after (- now 1min)
-                 ;; ignore events too early
-                 (> (* 1000 mtime) (tc/to-long (t/minus (t/now) (t/minutes 1))))
-                 true)
-               (or (string/starts-with? (.-dir e) base-path)
-                   (string/starts-with? (str "file://" (.-dir e)) base-path)) ; valid path prefix
-               (not (ignored? e))     ;not ignored
-               ;; download files will also trigger file-change-events, ignore them
-               (not (contains? (:recent-remote->local-files @*sync-state)
-                               (<! (<file-change-event=>recent-remote->local-file-item
-                                    graph-uuid e))))))))
+    (fn [es]
+      (go
+        (let [now-1min (tc/to-long (t/minus (t/now) (t/minutes 1)))
+              es* (filterv (fn [e]
+                             (and (instance? FileChangeEvent e)
+                                  (if-let [mtime (:mtime (.-stat e))]
+                                 ;; if mtime is not nil, it should be after (- now 1min)
+                                 ;; ignore events too early
+                                    (> (* 1000 mtime) now-1min)
+                                    true)
+                                  (or (string/starts-with? (.-dir e) base-path)
+                                      (string/starts-with? (str "file://" (.-dir e)) base-path))
+                                  (not (ignored? e))))
+                           es)
+              ;; download files will also trigger file-change-events, ignore them
+              recent-remote=>local-file-item->e-map
+              (<! (<file-change-events=>recent-remote->local-file-items graph-uuid es*))
+              filtered-recent-remote=>local-file-item
+              (set/difference (set (keys recent-remote=>local-file-item->e-map))
+                              (:recent-remote->local-files @*sync-state))]
+          (mapv #(get recent-remote=>local-file-item->e-map %) filtered-recent-remote=>local-file-item)))))
 
   (set-remote->local-syncer! [_ s] (set! remote->local-syncer s))
 
@@ -2494,17 +2551,19 @@
     (let [<fast-filter-e-fn (.filter-file-change-events-fn this)]
       (util/<ratelimit
        from-chan rate
+       :filter-fn-duration-window 1000
        :filter-fn
-       (fn [e]
+       (fn [es]
          (go
-           (and (rsapi-ready? rsapi graph-uuid)
-                 (<! (<fast-filter-e-fn e))
-                (do
-                  (swap! *sync-state sync-state--add-queued-local->remote-files e)
-                  (let [v (<! (<filter-local-changes-pred e base-path graph-uuid))]
-                    (when-not v
-                      (swap! *sync-state sync-state--remove-queued-local->remote-files e))
-                    v)))))
+           (if-not (rsapi-ready? rsapi graph-uuid)
+             []
+             (let [es* (<! (<fast-filter-e-fn es))]
+               (swap! *sync-state sync-state--add-queued-local->remote-files es*)
+
+               (let [es** (<! (<filter-local-changes-pred es* base-path graph-uuid))
+                     es-to-remove (set/difference (set es*) (set es**))]
+                 (swap! *sync-state sync-state--remove-queued-local->remote-files es-to-remove)
+                 es**)))))
        :flush-fn #(swap! *sync-state sync-state-reset-queued-local->remote-files)
        :stop-ch stop-chan
        :distinct-coll? true
@@ -2617,21 +2676,21 @@
                                             <!
                                             (sort-by (sort-file-metadata-fn :recent-days-range recent-10-days-range) >))
                 change-events
-                                       (sequence
-                                        (comp
+                (sequence
+                 (comp
                                          ;; convert to FileChangeEvent
-                                         (map #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %)
-                                                                  {:size (:size %)} (:etag %)))
-                                         (remove ignored?))
-                                        diff-local-files)
+                  (map #(->FileChangeEvent "change" base-path (.get-normalized-path ^FileMetadata %)
+                                           {:size (:size %)} (:etag %)))
+                  (remove ignored?))
+                 diff-local-files)
                 distinct-change-events (-> (distinct-file-change-events change-events)
                                            filter-upload-files-with-reserved-chars)
                 _                      (swap! *sync-state #(sync-state-reset-full-local->remote-files % distinct-change-events))
                 change-events-partitions
-                                       (sequence
+                (sequence
                                         ;; partition FileChangeEvents
-                                        (partition-file-change-events upload-batch-size)
-                                        distinct-change-events)]
+                 (partition-file-change-events upload-batch-size)
+                 distinct-change-events)]
             (println "[full-sync(local->remote)]"
                      (count (flatten change-events-partitions)) "files need to sync and"
                      (count delete-local-files) "local files need to delete")
@@ -2641,20 +2700,24 @@
                                       :full-sync? true
                                       :epoch      (tc/to-epoch (t/now))}})
             ;; 1. delete local files
-            (loop [[f & fs] delete-local-files]
-              (when f
-                (let [relative-p (relative-path f)]
-                  (when-not (<! (<local-file-not-exist? graph-uuid rsapi base-path relative-p))
-                    (let [fake-recent-remote->local-file-item {:remote->local-type :delete
-                                                               :checksum           nil
-                                                               :path               relative-p}]
-                      (swap! *sync-state sync-state--add-recent-remote->local-files
-                             [fake-recent-remote->local-file-item])
-                      (<! (<delete-local-files rsapi graph-uuid base-path [(relative-path f)]))
-                      (go (<! (timeout 5000))
-                        (swap! *sync-state sync-state--remove-recent-remote->local-files
-                               [fake-recent-remote->local-file-item])))))
-                (recur fs)))
+            (when (seq delete-local-files)
+              (let [path->not-exist?-map
+                    (<! (<local-files-not-exist? graph-uuid rsapi base-path (mapv relative-path delete-local-files)))
+                    local-filepaths-to-delete
+                    (->> path->not-exist?-map
+                         (filterv (fn [[_ not-exist?]] (false? not-exist?)))
+                         (mapv first))
+                    fake-recent-remote->local-file-items
+                    (mapv (fn [path]
+                            {:remote->local-type :delete
+                             :checksum           nil
+                             :path               path})
+                          local-filepaths-to-delete)]
+                (when (seq local-filepaths-to-delete)
+                  (swap! *sync-state sync-state--add-recent-remote->local-files fake-recent-remote->local-file-items)
+                  (<! (<delete-local-files rsapi graph-uuid base-path local-filepaths-to-delete))
+                  (go (<! (timeout 5000))
+                      (swap! *sync-state sync-state--remove-recent-remote->local-files fake-recent-remote->local-file-items)))))
 
             ;; 2. upload local files
             (loop [es-partitions change-events-partitions]

+ 5 - 4
src/main/frontend/modules/outliner/file.cljs

@@ -96,10 +96,11 @@
   []
   (util/<ratelimit (state/get-file-write-chan) batch-write-interval
                  :filter-fn
-                 (fn [[repo _ time]]
-                   (swap! *writes-finished? assoc repo {:time time
-                                                        :value false})
-                   true)
+                 (fn [elems]
+                   (doseq [[repo _ time] elems]
+                     (swap! *writes-finished? assoc repo
+                            {:time time :value false}))
+                   elems)
                  :flush-fn
                  (fn [col]
                    (let [start-time (tc/to-long (t/now))

+ 33 - 19
src/main/frontend/util.cljc

@@ -1105,46 +1105,60 @@
      "return a channel CH,
   ratelimit flush items in in-ch every max-duration(ms),
   opts:
-  - :filter-fn filter item before putting items into returned CH, (filter-fn item)
-               will poll it when its return value is channel,
+  - :filter-fn filter items before putting items into returned CH,
+               will poll it when (filter-fn items) return async channel,
+               (filter-fn items) return items
   - :flush-fn exec flush-fn when time to flush, (flush-fn item-coll)
   - :stop-ch stop go-loop when stop-ch closed
   - :distinct-coll? distinct coll when put into CH
   - :chan-buffer buffer of return CH, default use (async/chan 1000)
   - :flush-now-ch flush the content in the queue immediately
-  - :refresh-timeout-ch refresh (timeout max-duration)"
-     [in-ch max-duration & {:keys [filter-fn flush-fn stop-ch distinct-coll? chan-buffer flush-now-ch refresh-timeout-ch]}]
+  - :refresh-timeout-ch reset (timeout max-duration)
+  - :filter-fn-duration-window wait the duration before next (filter-fn items), default 500ms"
+     [in-ch max-duration & {:keys [filter-fn flush-fn stop-ch distinct-coll?
+                                   chan-buffer flush-now-ch refresh-timeout-ch
+                                   filter-fn-duration-window]
+                            :or {filter-fn-duration-window 500}}]
      (let [ch (if chan-buffer (async/chan chan-buffer) (async/chan 1000))
            stop-ch* (or stop-ch (async/chan))
            flush-now-ch* (or flush-now-ch (async/chan))
            refresh-timeout-ch* (or refresh-timeout-ch (async/chan))]
-       (async/go-loop [timeout-ch (async/timeout max-duration) coll []]
-         (let [{:keys [refresh-timeout timeout e stop flush-now]}
+       (async/go-loop [timeout-ch (async/timeout max-duration)
+                       filter-timeout-ch (async/timeout filter-fn-duration-window)
+                       coll-for-filter []
+                       coll-for-flush []]
+         (let [{:keys [refresh-timeout timeout filter-timeout e stop flush-now]}
                (async/alt! refresh-timeout-ch* {:refresh-timeout true}
                            timeout-ch {:timeout true}
+                           filter-timeout-ch {:filter-timeout true}
                            in-ch ([e] {:e e})
                            stop-ch* {:stop true}
                            flush-now-ch* {:flush-now true})]
            (cond
              refresh-timeout
-             (recur (async/timeout max-duration) coll)
+             (recur (async/timeout max-duration) filter-timeout-ch coll-for-filter coll-for-flush)
 
              (or flush-now timeout)
-             (do (async/onto-chan! ch coll false)
-                 (flush-fn coll)
+             (do (async/onto-chan! ch coll-for-flush false)
+                 (flush-fn coll-for-flush)
                  (drain-chan flush-now-ch*)
-                 (recur (async/timeout max-duration) []))
+                 (recur (async/timeout max-duration) filter-timeout-ch coll-for-filter []))
 
              (some? e)
-             (let [filter-v (filter-fn e)
-                   filter-v* (if (instance? ManyToManyChannel filter-v)
-                               (async/<! filter-v)
-                               filter-v)]
-               (if filter-v*
-                 (recur timeout-ch (cond-> (conj coll e)
-                                     distinct-coll? distinct
-                                     true vec))
-                 (recur timeout-ch coll)))
+             (recur timeout-ch filter-timeout-ch (conj coll-for-filter e) coll-for-flush)
+
+             (and filter-timeout (empty? coll-for-filter))
+             (recur timeout-ch (async/timeout filter-fn-duration-window) coll-for-filter coll-for-flush)
+
+             (and filter-timeout (seq coll-for-filter))
+             (let [filter-vs (filter-fn coll-for-filter)
+                   filter-vs* (if (instance? ManyToManyChannel filter-vs)
+                                (async/<! filter-vs)
+                                filter-vs)]
+               (recur timeout-ch (async/timeout filter-fn-duration-window) []
+                      (cond-> (apply conj coll-for-flush filter-vs*)
+                        distinct-coll? distinct
+                        true vec)))
 
              (or stop
                  ;; got nil from in-ch, means in-ch is closed