Reputation: 2845
I want to create a batch
function in Clojure:
Given a delay-ms
, it will batch calls to function f
, within that period, and send it all in one go.
Here's a naive implementation:
(defn delay-f
[f delay-ms]
(let [timer (Timer.)
task (atom nil)
latest-batch (atom [])]
(fn [& args]
(swap! latest-batch conj args)
(when-not @task
(let [new-task (proxy [TimerTask] []
(run []
(f @latest-batch)
(reset! task nil)
(reset! latest-batch [])
(.purge timer)))]
(reset! task new-task)
(.schedule timer new-task delay-ms))))))
I'm pretty sure, that given my use of atoms here, there's a race condition.
What would be the idiomatic solution here?
Upvotes: 1
Views: 356
Reputation: 1
Your code will reliably break whenever the function is called while the TimerTask is still executing. First, you want to reset the task and latest-batch atoms before running the function, not after. This would still have a race condition albeit a less likely one. We can use a ConcurrentLinkedQueue instead:
(defn delay-f
[f delay-ms]
(let [timer (java.util.Timer.)
task (atom nil)
queue (java.util.concurrent.ConcurrentLinkedQueue.)]
(fn [& args]
(.offer queue args)
(when-not @task
(let [new-task (proxy [java.util.TimerTask] []
(run []
(reset! task nil)
(f (loop [r []]
(if-let [e (.poll queue)]
(recur (conj r e))
r)))
(.purge timer)))]
(reset! task new-task)
(.schedule timer new-task delay-ms))))))
Upvotes: 0
Reputation: 29958
I think the best way to approach this problem would be to use the timer library overtone/at-at instead of reinventing the wheel. In particular, the function overtone.at-at/every provides the scheduling you want.
Define an atom to hold accumulated tasks (a.k.a "thunks") to execute, and a function to append new tasks to the queue.
Define an "execute" function that is passed to every
that will clear the queue and execute each task found there in sequence.
The Clojure atom will prevent any race conditions, as only the "append" function or the "execute" function is allowed to execute at any given point in time. If both functions try to modify the atom contents simultaneously, one of them will be forced to wait until the other completes.
For alternative libs, please see the Clojure-Toolbox under the topic Scheduling.
Upvotes: 1