Reputation: 413
I use some Java library that makes non-async get and post requests. I used to wrap such requests to futures and it solves for me the "waiting problem" (I mean waiting for the response)
(defn unchangeable-lib-request [n]
(Thread/sleep 1000)
n)
(defn process [n]
(let [res (atom [])]
(dotimes [i n]
(future (swap! res conj (unchangeable-lib-request i))))
(loop []
(if (> n (count @res))
(recur)
@res))))
(time (process 9))
;; "Elapsed time: 1000.639079 msecs"
;; => [8 7 5 6 4 3 2 1 0]
But I need to create hundreds of requests and this creates performance problems. I found out about core.async and go blocks. But if I will use go-blocks with this library, it will not solve the "waiting problem"
(defn unchangeable-lib-request [n]
(Thread/sleep 1000)
n)
(defn process [n]
(let [c (async/chan 10)]
(dotimes [i n]
(async/go
(async/>! c (unchangeable-lib-request i))))
(loop [result []]
(if (> n (count result))
(recur (conj result (async/<!! c)))
result))))
(time (process 9))
;; "Elapsed time: 2001.770183 msecs"
;; => [0 4 1 6 7 2 5 3 8]
Go blocks can handle just 8 requests simultaneously. Is there a possibility to write some async-wrapper that will park go-block and provide ability to make 100s of requests asynchronously without blocking each other?
(defn process [n]
(let [c (async/chan 10)]
(dotimes [i n]
(async/go
(async/>! c (magic-async-parking-wrapper
(unchangeable-lib-request i))))
(loop [result []]
(if (> n (count result))
(recur (conj result (async/<!! c)))
result))))
(time (process 9))
;; "Elapsed time: 1003.2563 msecs"
I know about async/thread but it seems that this is the same as (future ...).
Is it possible?
Upvotes: 3
Views: 1507
Reputation: 9266
This is where you use clojure.core.async/pipeline-blocking
(require '[clojure.core.async :as a :refer [chan pipeline-blocking]])
(let [output-chan (chan 100)
input-chan (chan 1000)]
(pipeline-blocking 4 ; parallelism knob
output-chan
(map unchangeable-lib-request)
input-chan)
;; Consume results from output-chan, put operations on input-chan
[output-chan input-chan]
)
This spawns n (in this case 4) threads that are kept busy executing unchangeable-lib-request
.
Use the buffer size of output-chan
to finetune how much requests you want to happen in advance.
Use the buffer size of input-chan
to finetune how many requests you want scheduled without backpropagation (a blocking input-chan
).
Upvotes: 1
Reputation: 15442
I'd suggest:
put!
, something like: (future (put! chan (worker-function)))
Upvotes: 2