|
|
@@ -1125,11 +1125,11 @@
|
|
|
will poll it when its return value is channel,
|
|
|
- :flush-fn exec flush-fn when time to flush, (flush-fn item-coll)
|
|
|
- :stop-ch stop go-loop when stop-ch closed
|
|
|
- - :distinct-coll? distinct coll when put into CH
|
|
|
+ - :distinct-key-fn distinct coll when put into CH
|
|
|
- :chan-buffer buffer of return CH, default use (async/chan 1000)
|
|
|
- :flush-now-ch flush the content in the queue immediately
|
|
|
- :refresh-timeout-ch refresh (timeout max-duration)"
|
|
|
- [in-ch max-duration & {:keys [filter-fn flush-fn stop-ch distinct-coll? chan-buffer flush-now-ch refresh-timeout-ch]}]
|
|
|
+ [in-ch max-duration & {:keys [filter-fn flush-fn stop-ch distinct-key-fn chan-buffer flush-now-ch refresh-timeout-ch]}]
|
|
|
(let [ch (if chan-buffer (async/chan chan-buffer) (async/chan 1000))
|
|
|
stop-ch* (or stop-ch (async/chan))
|
|
|
flush-now-ch* (or flush-now-ch (async/chan))
|
|
|
@@ -1157,8 +1157,8 @@
|
|
|
(async/<! filter-v)
|
|
|
filter-v)]
|
|
|
(if filter-v*
|
|
|
- (recur timeout-ch (cond-> (conj coll e)
|
|
|
- distinct-coll? distinct
|
|
|
+ (recur timeout-ch (cond->> (conj coll e)
|
|
|
+ distinct-key-fn (distinct-by distinct-key-fn)
|
|
|
true vec))
|
|
|
(recur timeout-ch coll)))
|
|
|
|