Reputation: 3103
I'm at Ch. 6 of Paul Butcher's 7 Concurrency Models in 7 Weeks, which is focussing on core.async
.
We have the following function
(defn map-chan [f from]
(let [to (chan)]
(go-loop []
(when-let [x (<! from)]
(>! to (f x))
(println "parking channel write.")
(recur))
(close! to))
(println "map-chan done.")
to))
I added the printlns
myself, to explore exact order of computation, which I want to ask about here.
We can run it like this
(def ch (to-chan (range 10))) ; [1]
(def mapped (map-chan (partial * 2) ch)) ; [2]
(<!! (async/into [] mapped)) ; [3]
;; [1] Create & rtn a channel from els of seq, closing it when seq fin.
;; [2] map-chan returns immediately, with blocked go blocks inside of it.
;; [3] calling async/into finally triggers the parked channel writes, as seen below.
in the repl:
channels.core=> (def ch (to-chan (range 10)))
#'channels.core/ch
channels.core=> (def mapped (map-chan (partial * 2) ch))
map-chan done.
#'channels.core/mapped
channels.core=> (<!! (async/into [] mapped))
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
parking channel write.
[0 2 4 6 8 10 12 14 16 18]
channels.core=>
Question
We have a (sync) (i.e. unbuffered) channel here that has both writer and reader to it ready to go. Why is my "parking channel write" above not triggered until async/into
is called?
(It's not the channel read with <!!
that triggers it, it's async/into
itself - easy to check). I'm not complaining about this, just seeking to understand why the trace is the way it is.
Are channels actually somehow lazy as well? He hasn't mentioned this in the book yet.
Note the dependency on this code is org.clojure/core.async "0.1.267.0-0d7780-alpha"
, if that makes any difference.
Also, in the book he has used a buffered channel of length 10. Yet, I also tried it with an unbuffered (sync) channel and the result seems the same.
Upvotes: 0
Views: 136
Reputation: 29958
Your output channel to
has a size of zero, and so the write cannot take place until a corresponding take is requested. Look at a modified version of your code:
(ns tst.demo.core
(:use tupelo.core tupelo.test )
(:require
[clojure.core.async :as async]
))
(defn map-chan [f from]
(let [to (async/chan)]
(async/go
(loop []
(when-let [x (async/<! from)]
(println "put - pre")
(async/>! to (f x))
(println "put - post")
(recur)))
(async/close! to))
(println "map-chan returns output buffer")
to))
(dotest
(println :1)
(spyx
(def ch (async/to-chan (range 10)))) ; [1]
(Thread/sleep 2000) (println :2)
(spyx
(def mapped (map-chan (partial * 2) ch))) ; [2]
(Thread/sleep 2000) (println :3)
(spyx
(async/<!! (async/into [] mapped))) ; [3]
)
with results:
-------------------------------
Clojure 1.10.1 Java 13
-------------------------------
lein test tst.demo.core
:1
(def ch (async/to-chan (range 10))) => #'tst.demo.core/ch
:2
map-chan returns output buffer
(def mapped (map-chan (partial * 2) ch)) => #'tst.demo.core/mapped
put - pre
:3
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
(async/<!! (async/into [] mapped)) => [0 2 4 6 8 10 12 14 16 18]
So, the go loop does start running immediately, but the first put operation blocks until the async/into
at step [3] occurs.
If we use a buffered output channel of length 20, we see the go loop running before step [3] occurs:
...
(let [to (async/chan 20)]
...
with result:
:1
(def ch (async/to-chan (range 10))) => #'tst.demo.core/ch
:2
map-chan returns output buffer
(def mapped (map-chan (partial * 2) ch)) => #'tst.demo.core/mapped
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
put - pre
put - post
:3
(async/<!! (async/into [] mapped)) => [0 2 4 6 8 10 12 14 16 18]
Upvotes: 1