Reputation: 21
(defn DoubleFrequency []
(def s (slurp "Example.txt"))
(def m (reduce #(assoc %1 %2 (inc (%1 %2 0)))
{}
(re-seq #".." s)))
(def c (count m))
(doseq [[k x] m]
(println k ":" (/ x c))))
I'm trying to apply concurrency to my program, and I want to use pmap, but I'm not sure how to work it into my current code here. The functionality is correct for single core, but Ideally I want to replace reduce with pmap in some way and achieve the same results.
Upvotes: 0
Views: 251
Reputation: 17859
first of all, the function you're trying to make up, is called frequencies
:
user> (frequencies [1 2 1 3 1 4 4])
;;=> {1 3, 2 1, 3 1, 4 2}
it is, indeed, single threaded. So let's try to make it parallel.
the initial approach with reduce
is the right direction, though it's not parallel either, it could be employed to make the parallel one with clojure's standard library concurrency facilities, namely reducers.
first of all, let's rewrite your reducer function a bit, to do the same thing, but in a more idiomatic way (it is optional, but good for readability):
#(assoc %1 %2 (inc (%1 %2 0)))
=> #(update %1 %2 (fnil inc 0))
then we can approach to the parallel reduce with fold
:
(require '[clojure.core.reducers :as r])
(defn pfreq [data]
(r/fold
(partial merge-with +)
(fn [acc k] (update acc k (fnil inc 0)))
data))
the idea is that it splits your collection by chunks (if it is long enough), and then combines chunks' results with merge-with
:
user> (pfreq [1 2 1 3 1 4 1 5 2])
;;=> {1 4, 2 2, 3 1, 4 1, 5 1}
notice also, that the collection should be 'foldable'. By default, persistent vectors and maps are foldable, re-seq
result is not, so you should first convert it into vector: (vec (re-seq #"..x" s))
, otherwise you won't get any parallelization, falling back to plain reduce
.
You can obviously approach to this one with pmap, with the same strategy: split -> map -> combine:
(defn pfreq2 [chunk-size data]
(->> data
(partition-all chunk-size)
(pmap frequencies)
(apply merge-with +)))
but this is not as flexible and powerful, as the reducers
pipelines.
Upvotes: 7