estolua
estolua

Reputation: 676

How to model Rx's `withLatestFrom` with core.async channels?

For example given a channel with operations and another channel with data, how to write a go block that will apply the operation on whatever was the last value on the data channel?

(go-loop []
  (let [op (<! op-ch)
        data (<! data-ch)]
    (put! result-ch (op data))))

Obviously that doesn't work because it would require both channels to have the same frequency.

(see http://rxmarbles.com/#withLatestFrom)

Upvotes: 3

Views: 120

Answers (2)

juan.facorro
juan.facorro

Reputation: 9930

Using alts! you could accomplish what you want.

The with-latest-from shown below implements the same behavior found in the withLatestFrom from RxJS (I think :P).

(require '[clojure.core.async :as async])

(def op-ch (async/chan))
(def data-ch (async/chan))

(defn with-latest-from [chs f]
  (let [result-ch (async/chan)
        latest    (vec (repeat (count chs) nil))
        index     (into {} (map vector chs (range)))]
    (async/go-loop [latest latest]
      (let [[value ch] (async/alts! chs)
            latest     (assoc latest (index ch) value)]
        (when-not (some nil? latest)
          (async/put! result-ch (apply f latest)))
        (when value (recur latest))))
    result-ch))

(def result-ch (with-latest-from [op-ch data-ch] str))

(async/go-loop []
  (prn (async/<! result-ch))
  (recur))

(async/put! op-ch :+)
;= true
(async/put! data-ch 1)
;= true
; ":+1"
(async/put! data-ch 2)
;= true
; ":+2"
(async/put! op-ch :-)
;= true
; ":-2"

Upvotes: 1

claj
claj

Reputation: 5412

There's an :priority true option for the alts!.

An expression which always returns the latest seen value in some channel would look something like this:

(def in-chan (chan))

(def mem (chan))

(go (let [[ch value] (alts! [in-chan mem] :priority true)]
    (take! mem)      ;; clear mem (take! is non-blocking)
    (>! mem value)   ;; put the new (or old) value in the mem
    value            ;; return a chan with the value in

It's untested, it's probably not efficient (a volatile variable is probably better). The go-block returns a channel with only the value, but the idea could be expanded to some "memoized" channel.

Upvotes: 1

Related Questions