|
|
@@ -1086,6 +1086,47 @@
|
|
|
(do (swap! buf conj v)
|
|
|
(recur buf t))))))
|
|
|
|
|
|
+(defn ratelimit
|
|
|
+ "return a channel CH,
|
|
|
+ ratelimit flush items in in-ch every max-duration(ms),
|
|
|
+ opts:
|
|
|
+ - :filter-fn filter item before putting items into returned CH, (filter-fn item)
|
|
|
+ - :flush-fn exec flush-fn when time to flush, (flush-fn item-coll)
|
|
|
+ - :stop-ch stop go-loop when stop-ch closed
|
|
|
+ - :distinct-coll? distinct coll when put into CH
|
|
|
+ - :chan-buffer buffer of return CH, default use (async/chan 1000)"
|
|
|
+ [in-ch max-duration & {:keys [filter-fn flush-fn stop-ch distinct-coll? chan-buffer]}]
|
|
|
+ (let [ch (if chan-buffer (async/chan chan-buffer) (async/chan 1000))
|
|
|
+ stop-ch* (or stop-ch (async/chan))]
|
|
|
+ (async/go-loop [timeout-ch (async/timeout max-duration) coll []]
|
|
|
+ (let [{:keys [timeout e stop]}
|
|
|
+ (async/alt! timeout-ch {:timeout true}
|
|
|
+ in-ch ([e] {:e e})
|
|
|
+ stop-ch* {:stop true})]
|
|
|
+ (cond
|
|
|
+ timeout
|
|
|
+ (do (async/onto-chan! ch coll false)
|
|
|
+ (flush-fn coll)
|
|
|
+ (recur (async/timeout max-duration) []))
|
|
|
+
|
|
|
+ (some? e)
|
|
|
+ (if (filter-fn e)
|
|
|
+ (recur timeout-ch (cond-> (conj coll e) distinct-coll? distinct))
|
|
|
+ (recur timeout-ch coll))
|
|
|
+
|
|
|
+ (or stop
|
|
|
+ ;; got nil from in-ch, means in-ch is closed
|
|
|
+ ;; so we stop the whole go-loop
|
|
|
+ (nil? e))
|
|
|
+ (async/close! ch))))
|
|
|
+ ch))
|
|
|
+
|
|
|
+(defn drain-chan
|
|
|
+ "drop all stuffs in CH, and return all of them"
|
|
|
+ [ch]
|
|
|
+ (->> (repeatedly #(async/poll! ch))
|
|
|
+ (take-while identity)))
|
|
|
+
|
|
|
#?(:cljs
|
|
|
(defn trace!
|
|
|
[]
|