Reputation: 370
I have inherited some code that polls for updates, appending these updates to a lazy sequence and processed. After upgrading from clojure 1.7.0-alpha5 to any later version,the code appears broken due to the chunking
of the lazy sequence. I've written an example to show the issue:
(defn infinite-updates
[]
(letfn [(step [n]
(lazy-seq
;; Poll for an update
(Thread/sleep 3000)
(cons n (step (rand-int 5)))))]
(lazy-seq
(step (rand-int 5)))))
;; Run with:
(doseq [t (sequence (map inc) (infinite-updates))] (println t))
The project was on clojure 1.7.0-alpha5 and works as such: Every 3 seconds t
is printed.
Once I upgrade past that revision, it chunks the results, so I get 32 t
's printed after approximately 1.5 minutes.
I have tried using the following:
(defn unchunk [s]
(when (seq s)
(lazy-seq
(cons (first s)
(unchunk (next s))))))
To unchunk
the data with no luck.
How can I process these updates as they are available OR is there a more idiomatic way to write infinite-updates
such that I process all updates as they come in without relying on a lazy-seq?
Upvotes: 1
Views: 379
Reputation: 29958
Your problem is tailor-made for clojure.core.async
. This code shows the outlines:
(ns tst.clj.core
(:use clj.core
clojure.test)
(:require
[clojure.core.async :as async]
))
; create a buffer of arbitrary size 99 (any size, even zero, would work)
(let [buffer (async/chan 99) ]
(async/go ; start sending loop in another thread
(while true
(Thread/sleep 3000)
(let [val (rand-int 5) ]
(println "putting:" val)
(async/>!! buffer val))))
(while true ; start receiving loop in this thread
(println "received:"
(async/<!! buffer))))
with output:
*clojure-version* => {:major 1, :minor 8, :incremental 0, :qualifier nil}
java.version => 1.8.0_111
putting: 4
received: 4
putting: 1
received: 1
putting: 0
received: 0
putting: 0
received: 0
putting: 0
received: 0
putting: 1
received: 1
putting: 1
received: 1
putting: 0
received: 0
putting: 0
received: 0
putting: 0
received: 0
putting: 3
received: 3
putting: 0
received: 0
putting: 1
received: 1
putting: 4
received: 4
putting: 3
received: 3
putting: 2
received: 2
putting: 4
received: 4
every 3 seconds. Please see also:
http://www.braveclojure.com/core-async/
http://clojure.com/blog/2013/06/28/clojure-core-async-channels.html
http://clojure.github.io/core.async/
https://github.com/clojure/core.async/blob/master/examples/walkthrough.clj
https://www.infoq.com/presentations/clojure-core-async
Upvotes: 3
Reputation: 4748
Instead of using the transducer, use the "regular" map (your manually constructed list is not chunked):
(doseq [t (map inc (infinite-updates))] (println t))
Upvotes: 2