Reputation: 46882
I haven't used multithreading in Clojure at all so am unsure where to start.
I have a doseq
whose body can run in parallel. What I'd like is for there always to be 3 threads running (leaving 1 core free) that evaluate the body in parallel until the range is exhausted. There's no shared state, nothing complicated - the equivalent of Python's multiprocessing would be just fine.
So something like:
(dopar 3 [i (range 100)]
; repeated 100 times in 3 parallel threads...
...)
Where should I start looking? Is there a command for this? A standard package? A good reference?
So far I have found pmap
, and could use that (how do I restrict to 3 at a time? looks like it uses 32 at a time - no, source says 2 + number of processors), but it seems like this is a basic primitive that should already exist somewhere.
clarification: I really would like to control the number of threads. I have processes that are long-running and use a fair amount of memory, so creating a large number and hoping things work out OK isn't a good approach (example which uses a significant chunk available mem).
update: Starting to write a macro that does this, and I need a semaphore (or a mutex, or an atom i can wait on). Do semaphores exist in Clojure? Or should I use a ThreadPoolExecutor? It seems odd to have to pull so much in from Java - I thought parallel programming in Clojure was supposed to be easy... Maybe I am thinking about this completely the wrong way? Hmmm. Agents?
Upvotes: 18
Views: 8207
Reputation: 547
Use pipelines and channels. If your operations are IO bound that is a preferable option as pmap's pool is bound to CPUs amount.
Another good option is to use an agent along with send-off which uses cachedThredPoolExecutor underneath.
Upvotes: 0
Reputation: 736
There's actually a library now for doing exactly this. From their github
:
The claypoole library provides threadpool-based parallel versions of Clojure functions such as pmap
, future
, and for
.
It provides both ordered/unordered versions for the same.
Upvotes: 7
Reputation: 5017
I had a similar problem with the following requirements:
The core pmap
function only satisfies the last two assumptions.
Here is an implementation which does satisfy those assumptions, using a standard Java thread pool ExecutorService
together with a CompletionService
and some partitioning of the input stream:
(require '[clojure.tools.logging :as log])
(import [java.util.concurrent ExecutorService ExecutorCompletionService
CompletionService Future])
(defn take-seq
[^CompletionService pool]
(lazy-seq
(let [^Future result (.take pool)]
(cons (.get result)
(take-seq pool)))))
(defn qmap
[^ExecutorService pool chunk-size f coll]
(let [worker (ExecutorCompletionService. pool)]
(mapcat
(fn [chunk]
(let [actual-size (atom 0)]
(log/debug "Submitting payload for processing")
(doseq [item chunk]
(.submit worker #(f item))
(swap! actual-size inc))
(log/debug "Outputting completed results for" @actual-size "trades")
(take @actual-size (take-seq worker))))
(partition-all chunk-size coll))))
As it can be seen qmap
does not instantiate the thread pool itself, but only the ExecutorCompletionService
. This allows, for example, to pass in a fixed size ThreadPoolExecutorService
. Also, since qmap
returns a lazy sequence, it can not and must not manage the thread pool resource itself. Finally the chunk-size
allows limiting how many elements of the input sequence are realized and submitted as tasks at once.
The below code demonstrates the proper usage:
(import [java.util.concurrent Executors])
(let [thread-pool (Executors/newFixedThreadPool 3)]
(try
(doseq [result (qmap thread-pool
;; submit no more than 500 tasks at once
500
long-running-resource-intensive-fn
unboundedly-large-lazy-input-coll)]
(println result))
(finally
;; (.shutdown) only prohibits submitting new tasks,
;; (.shutdownNow) will even cancel already submitted tasks.
(.shutdownNow thread-pool))))
Here are the documentation for the some of the used Java concurrency classes:
Upvotes: 4
Reputation: 1808
Not sure if it is idiomatic, as I'm still quite a beginner with Clojure, but the following solution works for me and it also looks pretty concise:
(let [number-of-threads 3
await-timeout 1000]
(doseq [p-items (partition number-of-threads items)]
(let [agents (map agent p-items)]
(doseq [a agents] (send-off a process))
(apply await-for await-timeout agents)
(map deref agents))))
Upvotes: 2
Reputation: 106351
pmap
will actually work fine in most circumstances - it uses a thread pool with a sensible number of threads for your machine. I wouldn't bother trying to create your own mechanisms to control the number of threads unless you have real benchmark evidence that the defaults are causing a problem.
Having said that, if you really want to limit to a maximum of three threads, an easy approach is to just use pmap on 3 subsets of the range:
(defn split-equally [num coll]
"Split a collection into a vector of (as close as possible) equally sized parts"
(loop [num num
parts []
coll coll
c (count coll)]
(if (<= num 0)
parts
(let [t (quot (+ c num -1) num)]
(recur (dec num) (conj parts (take t coll)) (drop t coll) (- c t))))))
(defmacro dopar [thread-count [sym coll] & body]
`(doall (pmap
(fn [vals#]
(doseq [~sym vals#]
~@body))
(split-equally ~thread-count ~coll))))
Note the use of doall
, which is needed to force evaluation of the pmap
(which is lazy).
Upvotes: 5
Reputation: 91907
Why don't you just use pmap? You still can't control the threadpool, but it's a lot less work than writing a custom macro that uses agents (why not futures?).
Upvotes: 4
Reputation: 46882
OK, I think what I want is to have an agent
for each loop, with the data sent to the agent using send
. The agents triggered using send
are run from a thread pool, so the number is limited in some way (it doesn't give the fine-grained control of exactly three threads, but it'll have to do for now).
[Dave Ray explains in comments: to control pool size I'd need to write my own]
(defmacro dopar [seq-expr & body]
(assert (= 2 (count seq-expr)) "single pair of forms in sequence expression")
(let [[k v] seq-expr]
`(apply await
(for [k# ~v]
(let [a# (agent k#)]
(send a# (fn [~k] ~@body))
a#)))))
which can be used like:
(deftest test-dump
(dopar [n (range 7 11)]
(time (do-dump-single "/tmp/single" "a" n 10000000))))
Yay! Works! I rock! (OK, Clojure rocks a little bit too). Related blog post.
Upvotes: 7