David Sheldrick
David Sheldrick

Reputation: 71

Clojure: data resulting from parallel computation seems to be significantly slower to access

I've got a function which concurrently counts the frequencies of certain features in a text file and collates the data. The output of the function is thousands of frequency distributions stored in a persistent map. As a simple example:

{"dogs" {"great dane" 2, "poodle" 4}, "cats" {"siamese" 1 "tom" 3}}

and the code that would produce this:

(defn do-the-thing-1 [lines species_list]
  ;; we know the full list of species beforehand so to avoid thread contention
  ;; for a single resource, make an atom for each species
  (let [resultdump      (reduce #(assoc %1 %2 (atom {})) {} species_list)
        line-processor  (fn [line]
                          (fn [] ; return a function that will do the work when invoked
                            (doseq [[species breed] (extract-pairs line)]
                              (swap! ; increase the count for this species-breed pair
                                (resultdump species)
                                update-in [breed] #(+ 1 (or % 0))))))
        pool            (Executors/newFixedThreadPool 4)]
    ;; queue up the tasks
    (doseq [future (.invokeAll pool (map line-processor lines))]
      (.get future))
    (.shutdown pool)
    (deref-vals result)))

(defn deref-vals [species_map]
  (into {} (for [[species fdist] species_map] [species @fdist]))

This works fine. The problem is that I need to convert them to probability distributions before I can use them. e.g.

{"dogs" {"great dane" 1/3, "poodle" 2/3}, "cats" {"siamese" 1/4, "tom" 3/4}}

Here's the function that does that:

(defn freq->prob 
  "Converts a frequency distribution into a probability distribution"
  [fdist]
  (let [sum (apply + (vals fdist))]
    (persistent!
      (reduce
        (fn [dist [key val]] (assoc! dist key (/ val sum)))
        (transient fdist)
        (seq fdist)))))

Doing this conversion on the fly as the distributions are being consumed by the next step in the processing pipeline gives reasonable speeds, but also a fair amount of redundant conversions as some distributions are used more than once. When I modify my function to perform the conversions in parallel before returning the result, the speed at which latter stages of processing occur drops dramatically.

Here's the modified function:

(defn do-the-thing-2 [lines species_list]
  ;; we know the full list of species beforehand so to avoid thread contention
  ;; for a single resource, make an atom for each species
  (let [resultdump      (reduce #(assoc %1 %2 (atom {})) {} species_list)
        line-processor  (fn [line]
                          (fn [] ; return a function that will do the work when invoked
                            (doseq [[species breed] (extract-pairs line)]
                              (swap! ; increase the count for this species-breed pair
                                (resultdump species)
                                update-in [breed] #(+ 1 (or % 0))))))
        pool            (Executors/newFixedThreadPool 4)]
    ;; queue up the tasks
    (doseq [future (.invokeAll pool (map line-processor lines))]
      (.get future))

    ;; this is the only bit that has been added
    (doseq [future (.invokeAll pool (map
                                      (fn [fdist_atom]
                                        #(reset! fdist_atom (freq->prob @fdist_atom)))
                                      (vals resultdump)))]
      (.get future))

    (.shutdown pool)
    (deref-vals result)))

So yeah, this makes everything afterward about 10x slower than it is when simply calling freq->prob upon every access to the resulting map, though the returned data is identical. Can anyone suggest reasons as to why that may be or what I could do about it?

EDIT: I'm now suspecting it has something to do with Clojure's fractions. If I modify the freq->prob function to create floats or doubles in stead of fractions, performance is improved when pre-computing the probability distributions as opposed to generating them on-the-fly. Could it be that fractions created in an atom run slower than fractions created outside an atom? I just ran some simple tests that indicate this is not the case, so there's definitely something weird going on here.

Upvotes: 3

Views: 225

Answers (2)

wvdlaan
wvdlaan

Reputation: 61

Regarding your question on converting the prob distribution.

If you rewrite 'freq-prob' like this:

(defn cnv-freq [m]
 (let [t (apply + (vals m))]
  (into {} (map (fn [[k v]] [k (/ v t)]) m))))
(defn freq-prob [m]
 (into {} (pmap (fn [[k v]] [k (cnv-freq v)]) m)))

You can enable/disable parallel execution by changing 'pmap' to 'map'.

Upvotes: 0

sw1nn
sw1nn

Reputation: 7328

I'm not 100% sure I've followed your logic, but your map function here:

(map
    (fn [fdist_atom]
        #(reset! fdist_atom (freq->prob @fdist_atom)))
    (vals resultdump))

doesn't look right. If you are updating an atom based on its old value, swap! is more appropriate than reset! for a function applied to the dereferenced value of the atom. This would seem better:

(map
    (fn [fdist_atom] (swap! fdist_atom freq->prob))
    (vals resultdump))

Upvotes: 1

Related Questions