Reputation: 28608
When using clojure.core.async, is there a way to have one channel wait for the first item to be put on it, then wait some small amount of time, then get all the items currently on the channel (that could have arrived while waiting) and get all of them without blocking?
I.e. is there a way to implement get-available-items
:
(defn delayer [ch ch2]
(go (loop []
(when-let [v (<! ch)]
(<! (timeout 500))
(let [vs (get-available-items ch)
items (cons v vs)]
(>! ch2 items))
(recur)))))
Basically, something like BlockingQueue.drain in Java.
Upvotes: 5
Views: 965
Reputation: 8593
You can just alt on the same timeout channel until you run out of "waiting time", collecting any incoming values meanwhile.
These seems to work:
(require '[clojure.core.async :as a :refer [<! >! go chan]])
(defn delayer [in out]
(a/go-loop []
(when-let [v (<! in)]
(loop [batch [v] timeout-ch (a/timeout 500)]
(let [[v ch] (a/alts! [in timeout-ch])]
(if (= in ch)
(recur (conj batch v) timeout-ch)
(>! out batch))))
(recur))))
Notice that we create the timeout channel just once and we reuse it. A simple test to prove that it works:
(def out (chan))
(def in (chan))
(delayer in out)
; print batches as soon as available
(a/go-loop []
(>pprint (str (java.util.Date.) (<! out)))
(recur))
; put a value every 100 millis
(a/go-loop [i 100]
(when-not (zero? i)
(<! (a/timeout 100))
(>! in i)
(recur (dec i))))
Upvotes: 2
Reputation: 91554
There are plans to offer this feature with channels, though for now you can check for the presence of something on a channel with:
(alts!! [my-chan] :default :nothing-immediately-in-chan)
by iterating that you can drain a channel without blocking.
PS: extra thanks to tbaldridge and julianlevis on #clojure for helping with this one
Upvotes: 6