dsg
dsg

Reputation: 13004

Implementing Clojure conditional/branching transducer

I'm trying to make a conditional transducer in Clojure as follows:

(defn if-xf
  "Takes a predicate and two transducers.
   Returns a new transducer that routes the input to one of the transducers
   depending on the result of the predicate."
  [pred a b]
  (fn [rf]
    (let [arf (a rf)
          brf (b rf)]
      (fn
        ([] (rf))
        ([result]
           (rf result))
        ([result input]
           (if (pred input)
             (arf result input)
             (brf result input)))))))

It is pretty useful in that it lets you do stuff like this:

;; multiply odd numbers by 100, square the evens.  
(= [0 100 4 300 16 500 36 700 64 900]
    (sequence
          (if-xf odd? (map #(* % 100)) (map (fn [x] (* x x))))
          (range 10)))

However, this conditional transducer does not work very well with transducers that perform cleanup in their 1-arity branch:

;; negs are multiplied by 100, non-negs are partitioned by 2
;; BUT! where did 6 go?
;; expected: [-600 -500 -400 -300 -200 -100 [0 1] [2 3] [4 5] [6]]
;;
(= [-600 -500 -400 -300 -200 -100 [0 1] [2 3] [4 5]]
 (sequence
  (if-xf neg? (map #(* % 100)) (partition-all 2))
  (range -6 7)))

Is it possible to tweak the definition of if-xf to handle the case of transducers with cleanup?

I'm trying this, but with weird behavior:

(defn if-xf
  "Takes a predicate and two transducers.
   Returns a new transducer that routes the input to one of the transducers
   depending on the result of the predicate."
  [pred a b]
  (fn [rf]
    (let [arf (a rf)
          brf (b rf)]
      (fn
        ([] (rf))
        ([result]
           (arf result) ;; new!
           (brf result) ;; new!
           (rf result))
        ([result input]
           (if (pred input)
             (arf result input)
             (brf result input)))))))

Specifically, the flushing happens at the end:

;; the [0] at the end should appear just before the 100.
(= [[-6 -5] [-4 -3] [-2 -1] 100 200 300 400 500 600 [0]]
      (sequence
       (if-xf pos? (map #(* % 100)) (partition-all 2))
       (range -6 7)))

Is there a way to make this branching/conditional transducer without storing the entire input sequence in local state within this transducer (i.e. doing all the processing in the 1-arity branch upon cleanup)?

Upvotes: 8

Views: 388

Answers (2)

James Vanderhyde
James Vanderhyde

Reputation: 374

I think your question is ill-defined. What exactly do you want to happen when the transducers have state? For example, what do you expect this do:

(sequence
  (if-xf even? (partition-all 3) (partition-all 2))
  (range 14))

Furthermore, sometimes reducing functions have work to do at the beginning and the end and can't be restarted arbitrarily. For example, here is a reducer that computes the mean:

(defn mean
  ([] {:count 0, :sum 0})
  ([result] (double (/ (:sum result) (:count result))))
  ([result x]
   (update-in
     (update-in result [:count] inc)
     [:sum] (partial + x))))
(transduce identity mean [10 20 40 40]) ;27.5

Now let's take the average, where anything below 20 counts for 20, but everything else is decreased by 1:

(transduce
  (if-xf
    (fn [x] (< x 20))
    (map (constantly 20))
    (map dec))
  mean [10 20 40 40]) ;29.25

My answer is the following: I think your original solution is best. It works well using map, which is how you stated the usefulness of the conditional transducer in the first place.

Upvotes: 1

ClojureMostly
ClojureMostly

Reputation: 4713

The idea is to complete every time the transducer switches over. IMO this is the only way to do it without buffering:

(defn if-xf
  "Takes a predicate and two transducers.
   Returns a new transducer that routes the input to one of the transducers
   depending on the result of the predicate."
  [pred a b]
  (fn [rf]
    (let [arf (volatile! (a rf))
          brf (volatile! (b rf))
          a? (volatile! nil)]
      (fn
        ([] (rf))
        ([result]
         (let [crf (if @a? @arf @brf)]
           (-> result crf rf)))
        ([result input]
         (let [p? (pred input)
               [xrf crf] (if p? [@arf @brf] [@brf @arf])
               switched? (some-> @a? (not= p?))]
           (if switched?
             (-> result crf (xrf input))
             (xrf result input))
           (vreset! a? p?)))))))
(sequence (if-xf pos? (map #(* % 100)) (partition-all 2)) [0 1 0 1 0 0 0 1])
; => ([0] 100 [0] 100 [0 0] [0] 100)

Upvotes: 1

Related Questions