Reputation: 21
In an effort to get some good concurrent programming practice I'm trying to implement the producer/consumer pattern using Clojure's core.async library. All is working well but I wanted to be able to stop both the producer and consumer at some point in time.
My current code looks something like this...
(def c (a/chan 5))
(def alive (atom true))
(def producer (a/go-loop []
(Thread/sleep 1000)
(when @alive
(a/>! c 1)
(recur))))
(def consumer (a/go-loop []
(Thread/sleep 3000)
(when @alive
(println (a/<! c))
(recur))))
(do
(reset! alive false)
(a/<!! producer)
(a/<!! consumer))
Unfortunately it appears that the 'do' block occasionally blocks indefinitely. I essentially want to be able to stop both go-loops from continuing and block until both loops have exited. The Thread/sleep code is there to simulate performing some unit of work.
I suspect that stopping the producer causes the consumer to park, hence the hanging, though I'm not sure of an alternative approach, any ideas?
Upvotes: 2
Views: 207
Reputation: 29984
Please see ClojureDocs for details. Example:
(let [c (chan 2) ]
(>!! c 1)
(>!! c 2)
(close! c)
(println (<!! c)) ; 1
(println (<!! c)) ; 2
;; since we closed the channel this will return false(we can no longer add values)
(>!! c 1))
For your problem something like:
(let [c (a/chan 5)
producer (a/go-loop [cnt 0]
(Thread/sleep 1000)
(let [put-result (a/>! c cnt)]
(println "put: " cnt put-result)
(when put-result
(recur (inc cnt)))))
consumer (a/go-loop []
(Thread/sleep 3000)
(let [result (a/<! c)]
(when result
(println "take: " result)
(recur))))]
(Thread/sleep 5000)
(println "closing chan...")
(a/close! c))
with result
put: 0 true
put: 1 true
take: 0
put: 2 true
put: 3 true
closing chan...
put: 4 false
take: 1
take: 2
take: 3
Upvotes: 1