user3186332
user3186332

Reputation: 370

Processing Infinite (Lazy) Sequences

I have inherited some code that polls for updates, appending these updates to a lazy sequence and processed. After upgrading from clojure 1.7.0-alpha5 to any later version,the code appears broken due to the chunking of the lazy sequence. I've written an example to show the issue:

(defn infinite-updates
  []
  (letfn [(step [n]
            (lazy-seq
             ;; Poll for an update
             (Thread/sleep 3000)
             (cons n (step (rand-int 5)))))]
     (lazy-seq
      (step (rand-int 5)))))

;; Run with:
(doseq [t  (sequence (map inc) (infinite-updates))] (println t))

The project was on clojure 1.7.0-alpha5 and works as such: Every 3 seconds t is printed.

Once I upgrade past that revision, it chunks the results, so I get 32 t's printed after approximately 1.5 minutes.

I have tried using the following:

(defn unchunk [s]
  (when (seq s)
  (lazy-seq
    (cons (first s)
        (unchunk (next s))))))

To unchunk the data with no luck.

How can I process these updates as they are available OR is there a more idiomatic way to write infinite-updates such that I process all updates as they come in without relying on a lazy-seq?

Upvotes: 1

Views: 379

Answers (2)

Alan Thompson
Alan Thompson

Reputation: 29958

Your problem is tailor-made for clojure.core.async. This code shows the outlines:

(ns tst.clj.core
  (:use clj.core
        clojure.test)
  (:require
     [clojure.core.async :as async]
  ))

; create a buffer of arbitrary size 99 (any size, even zero, would work)
(let [buffer (async/chan 99) ]
  (async/go       ; start sending loop in another thread
    (while true
      (Thread/sleep 3000)
      (let [val (rand-int 5) ]
        (println "putting:" val)
        (async/>!! buffer val))))
  (while true       ; start receiving loop in this thread
    (println "received:"
      (async/<!! buffer))))

with output:

*clojure-version* => {:major 1, :minor 8, :incremental 0, :qualifier nil}
java.version =>  1.8.0_111
putting: 4
received: 4
putting: 1
received: 1
putting: 0
received: 0
putting: 0
received: 0
putting: 0
received: 0
putting: 1
received: 1
putting: 1
received: 1
putting: 0
received: 0
putting: 0
received: 0
putting: 0
received: 0
putting: 3
received: 3
putting: 0
received: 0
putting: 1
received: 1
putting: 4
received: 4
putting: 3
received: 3
putting: 2
received: 2
putting: 4
received: 4

every 3 seconds. Please see also:

http://www.braveclojure.com/core-async/

http://clojure.com/blog/2013/06/28/clojure-core-async-channels.html

http://clojure.github.io/core.async/

https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj

https://www.infoq.com/presentations/clojure-core-async

https://youtu.be/enwIIGzhahw

Upvotes: 3

Shlomi
Shlomi

Reputation: 4748

Instead of using the transducer, use the "regular" map (your manually constructed list is not chunked):

(doseq [t (map inc (infinite-updates))] (println t))

Upvotes: 2

Related Questions