zcaudate
zcaudate

Reputation: 14258

how to build a chunked lazy-seq that blocks?

I'd like to use chunked cons or some other way to create a lazy-seq that blocks. Given a source:

(defn -source- [] (repeatedly (fn [] (future (Thread/sleep 100) [1 2]))))

(take 2 (-source-))
;; => (<future> <future>)

I'd like to have a function called injest where:

(take 3 (injest (-source-)))
=> [;; sleep 100
    1 2 
    ;; sleep 100
    1]

(take 6 (injest (-source-)))
=> [;; sleep 100
    1 2 
    ;; sleep 100
    1 2 
    ;; sleep 100
    1 2]

;; ... etc ...

how would I go about writing this function?

Upvotes: 2

Views: 173

Answers (3)

amalloy
amalloy

Reputation: 91857

This source will naturally block as you consume it, so you don't have to do anything terribly fancy. It's almost enough to simply (mapcat deref):

(doseq [x (take 16 (mapcat deref (-source- )))]
  (println {:value x :time (System/currentTimeMillis)}))
{:value 1, :time 1597725323091}
{:value 2, :time 1597725323092}
{:value 1, :time 1597725323092}
{:value 2, :time 1597725323093}
{:value 1, :time 1597725323093}
{:value 2, :time 1597725323093}
{:value 1, :time 1597725323194}
{:value 2, :time 1597725323195}
{:value 1, :time 1597725323299}
{:value 2, :time 1597725323300}
{:value 1, :time 1597725323406}
{:value 2, :time 1597725323406}
{:value 1, :time 1597725323510}
{:value 2, :time 1597725323511}

Notice how the first few items come in all at once, and then after that each pair is staggered by about the time you'd expect? This is due to the well-known(?) fact that apply (and therefore mapcat, which is implemented with apply concat) is more eager than necessary, for performance reasons. If it is important for you to get the right delay even on the first few items, you can simply implement your own version of apply concat that doesn't optimize for short input lists.

(defn ingest [xs]
  (when-let [coll (seq (map (comp seq deref) xs))]
    ((fn step [curr remaining]
       (lazy-seq
         (cond curr (cons (first curr) (step (next curr) remaining))
               remaining (step (first remaining) (next remaining)))))
      (first coll) (next coll))))

A. Webb in the comments suggests an equivalent but much simpler implementation:

(defn ingest [coll]
  (for [batch coll,
        item @batch]
    item))

Upvotes: 2

Rulle
Rulle

Reputation: 4901

You can solve it by iterating a state machine. I don't think this suffers from the optimizations related to apply pointed out by others, but I am not sure if there might be other issues with this approach:

(defn step-state [[current-element-to-unpack input-seq]]
  (cond
    (empty? input-seq) nil
    (empty? current-element-to-unpack) [(deref (first input-seq)) (rest input-seq)]
    :default [(rest current-element-to-unpack) input-seq]))

(defn injest [input-seq]
  (->> [[] input-seq]
       (iterate step-state)
       (take-while some?)
       (map first)
       (filter seq)
       (map first)))

Upvotes: 1

Denis Fuenzalida
Denis Fuenzalida

Reputation: 3346

I think you're good with just deref'ing the elements of the lazy seq, and just force the consumption of the entries you need, like this:

(defn -source- [] (repeatedly (fn [] (future (Thread/sleep 100) [1 2]))))

(defn injest [src]
  (map deref src))

;; (time (dorun (take 3 (injest (-source-)))))
;; => "Elapsed time: 303.432003 msecs"

;; (time (dorun (take 6 (injest (-source-)))))
;; => "Elapsed time: 603.319103 msecs"

On the other hand, I think that depending on the number of items it might be better to avoid creating lots of futures and use a lazy-seq that depending on the index of the element might block for a while.

Upvotes: 1

Related Questions