rascio
rascio

Reputation: 9279

Clojure swap! atom executed in parallel

I'm playing with clojure to do a script to read as input a sequence of URIs from a file and do a report on the status code for them.

I've implemented this using clojure.core.async/pipeline-async to execute the HTTP call to the URI (using an httpkit async call).

I want to monitor the execution of the script so I've an atom for the state:

(let [processing (atom [(System/currentTimeMillis) 0])]

and a function to track the progress.

(defn track-progress [total progress]
  (swap! progress 
         (fn [[time count]]
            (let [incremented-count (inc count)
                  now (System/currentTimeMillis)]
              (if (= 0 (mod incremented-count (max 1 (int (/ total 20)))))
                (do
                  (println (str "Progress " incremented-count "/" total " | " (- now time) "ms"))
                  [now incremented-count])
                [time incremented-count])))))

Using it after the HTTP call:

(a/pipeline-async
      parallelism
      output-chan
      (fn [[an-id uri] result]
        (http/head uri {:throw-exceptions false
                        :timeout timeout}
                   (fn [{:keys [status error]}]
                     (track-progress total processing)
                     (a/go 
                        (if (nil? error)
                            (do (a/>! result [an-id (keyword (str status))])
                                (a/close! result))
                            (do (a/>! result [an-id :error])
                                (a/close! result)))))))
      input-chan)

The processing atom is created in a let expression, using that pipeline-async part.
Everything seems working fine, apart from that log. I found out that sometimes the logging is very weird, having stuffs like this:


Progress 500/10000 | 11519ms
Progress 500/10000 | 11519msProgress 500/10000 | 11519ms

Progress 1000/10000 | 11446ms
Progress 1000/10000 | 11446ms
Progress 1500/10000 | 9503ms
Progress 2000/10000 | 7802ms
Progress 2500/10000 | 12822ms
Progress 2500/10000 | 12822msProgress 2500/10000 | 12822ms
Progress 2500/10000 | 12822ms

Progress 3000/10000 | 10623ms
Progress 3500/10000 | 9018ms
Progress 4000/10000 | 9618ms
Progress 4500/10000 | 13544ms
Progress 5000/10000 | 10541ms
Progress 5500/10000 | 10817ms
Progress 6000/10000 | 8921ms
Progress 6500/10000 | 9078ms
Progress 6500/10000 | 9078ms
Progress 7000/10000 | 9270ms
Progress 7500/10000 | 11826msProgress 7500/10000 | 11826msProgress 7500/10000 | 11826ms

The output is formatted as it is wrote in the shell, it seems that sometimes the same println is executed multiple times, or the fn passed to the swap! function is executed in parallel (no concurrency) in the atom. (If the the println I remove the str to create the string to print, the lines in which I have the same progress multiple times are totally mixed up like ProgressProgress 7500/10000 | 11826ms7500/100007500 | 11826msProgress/10000 | 11826ms)

Is it something wrong with my code?
Or I am getting the atom wrong, as I supposed it not allows the parallel execution of a function changing its state?

Upvotes: 2

Views: 535

Answers (2)

Alan Thompson
Alan Thompson

Reputation: 29958

What you want is to serialize the output stream from a group of concurrently executing threads. You could use an agent to serialize access to a piece of mutable state, but here you have a degenerate case without state, only with side-effects. For this case, the locking function is all you need.

An example:

(ns tst.demo.core
  (:use demo.core tupelo.core tupelo.test))

(defn do-println
  [& args]
  (apply println args))

(def lock-obj (Object.))
(defn do-println-locking
  [& args]
  (locking lock-obj
    (apply println args)))

(def sleep-millis 500)
(defn wait-and-print
  [print-fn id]
  (Thread/sleep sleep-millis)
  (print-fn (format "wait-and-print %s is complete" id)))

(defn start-threads
  [print-fn count]
  (println "-----------------------------------------------------------------------------")
  (let [futures (forv [i (range count)]
                  (future (wait-and-print print-fn i)))]
    (doseq [future futures]
      ; block until future is complete
      (deref future))))

(dotest
  (start-threads do-println 10)
  (start-threads do-println-locking 10))

Typical result:

--------------------------------------
   Clojure 1.10.2-alpha1    Java 15
--------------------------------------

Testing tst.demo.core
-----------------------------------------------------------------------------
wait-and-print 4 is completewait-and-print 3 is completewait-and-print 2 is complete
wait-and-print 8 is completewait-and-print 9 is complete
wait-and-print 6 is completewait-and-print 1 is complete

wait-and-print 7 is complete
wait-and-print 0 is complete

wait-and-print 5 is complete


-----------------------------------------------------------------------------
wait-and-print 5 is complete
wait-and-print 8 is complete
wait-and-print 7 is complete
wait-and-print 9 is complete
wait-and-print 6 is complete
wait-and-print 3 is complete
wait-and-print 0 is complete
wait-and-print 4 is complete
wait-and-print 2 is complete
wait-and-print 1 is complete

So you can see the output without serialization from locking is jumbled, while each println in the 2nd case is allowed to complete one-at-a-time (even though the order is still random).

If println printed one char at a time instead of one string at a time, the results in the unsynchronized case would be even more jumbled. Modify the output functions to print each character separately:

(defn do-println
  [& args]
  (doseq [ch (str/join args)]
    (print ch))
  (newline))

(def lock-obj (Object.))
(defn do-println-locking
  [& args]
  (locking lock-obj
    (apply do-println args)))

with typical result:

--------------------------------------
   Clojure 1.10.2-alpha1    Java 15
--------------------------------------

Testing tst.demo.core
-----------------------------------------------------------------------------
wwwwwaaawwiiiattti--taaa--nnaiddnaa--dwpp-irrptaiir-niiantnttn  -dw2ta-  ani96ipds trn- i-pcndrota-impn nrpd4itl- n eipt5tr s e7i 
 incisots   mc0cpo olmmieppstll ee
etctteo
e-
 amnidps-l pectroeai
intt- a1n di-sip rcsio nmctmpo plm3lew etaiei
spt t-lceeatone
d
m-pplreitnet
 8 is complete
-----------------------------------------------------------------------------
wait-and-print 3 is complete
wait-and-print 9 is complete
wait-and-print 8 is complete
wait-and-print 4 is complete
wait-and-print 6 is complete
wait-and-print 7 is complete
wait-and-print 0 is complete
wait-and-print 1 is complete
wait-and-print 5 is complete
wait-and-print 2 is complete

but we see that locking serializes the function calls so that the active call must complete before the next can begin.

Upvotes: 0

andy_fingerhut
andy_fingerhut

Reputation: 1516

A Clojure atom is designed specifically so that in a multi-threaded program, there can be multiple threads executing swap! on a single atom, and if your program does this, those update functions f given to swap! can run simultaneously. The only part of swap! that is synchronized is a 'compare and swap' operation that effectively does:

  • lock the atom's state
  • check if its current value is identical? to the reference it contained before f began executing, and if it is, replace it with the new object returned by f.
  • Unlock the atom's state".

The function f may take a long time to calculate a new value from the current one, but the critical section above is merely a pointer comparison, and if equal, a pointer assignment.

That is why the doc string for swap! says "Note that f may be called multiple times, and thus should be free of side effects."

Upvotes: 4

Related Questions