user3139545
user3139545

Reputation: 7374

Stateful transducers in core.async

Im trying to understand how to make stateful transducers in core.async. For example how would I make a transducer that counts the number of elements that have come throgh a channel? For example I want to input to be transfomed into a count that depends on the number of objects that have come before it.

From what I have read the way to go is to use volatile! to hold the state inside the transducer but im still not sure how to put all things together.

Upvotes: 2

Views: 502

Answers (1)

Leon Grapenthin
Leon Grapenthin

Reputation: 9266

You need a stateful transducer returning a reducing function closed over a volatile! tracking the count.

(defn count-xf [rf]
  (let [ctr (volatile! 0)]
    (fn
      ([] (rf))
      ([result] (rf result))
      ([result _]                         ; we ignore the input as
       (rf result (vswap! ctr inc))))))   ; we just pass on the count

This can be simplified using the core function completing

(defn count-xf [rf]
  (let [ctr (volatile! 0)]
    (completing
     (fn [result _]
       (rf result (vswap! ctr inc))))))

E. g. use it so

(let [ch (chan 1 count-xf)]
  (onto-chan ch (repeat 10 true))
  (<!! (clojure.core.async/into [] ch)))

;-> [1 2 3 4 5 6 7 8 9 10]

Alternatively, you could just use the map-indexed transducer but this would likely help you less to understand how transducers work. Also it requires a bit additional per-step overhead for this particular usecase.

(def count-xf (map-indexed (fn [i _] (inc i))))

Observe that its implementation diverges little from the implementation above.

Further reference: http://clojure.org/reference/transducers

Upvotes: 6

Related Questions