Reputation: 7374
Im trying to understand how to make stateful transducers in core.async. For example how would I make a transducer that counts the number of elements that have come throgh a channel? For example I want to input to be transfomed into a count that depends on the number of objects that have come before it.
From what I have read the way to go is to use volatile!
to hold the state inside the transducer but im still not sure how to put all things together.
Upvotes: 2
Views: 502
Reputation: 9266
You need a stateful transducer returning a reducing function closed over a volatile!
tracking the count.
(defn count-xf [rf]
(let [ctr (volatile! 0)]
(fn
([] (rf))
([result] (rf result))
([result _] ; we ignore the input as
(rf result (vswap! ctr inc)))))) ; we just pass on the count
This can be simplified using the core function completing
(defn count-xf [rf]
(let [ctr (volatile! 0)]
(completing
(fn [result _]
(rf result (vswap! ctr inc))))))
E. g. use it so
(let [ch (chan 1 count-xf)]
(onto-chan ch (repeat 10 true))
(<!! (clojure.core.async/into [] ch)))
;-> [1 2 3 4 5 6 7 8 9 10]
Alternatively, you could just use the map-indexed
transducer but this would likely help you less to understand how transducers work. Also it requires a bit additional per-step overhead for this particular usecase.
(def count-xf (map-indexed (fn [i _] (inc i))))
Observe that its implementation diverges little from the implementation above.
Further reference: http://clojure.org/reference/transducers
Upvotes: 6