Evgeniy Berezovsky
Evgeniy Berezovsky

Reputation: 19228

Wait for atom change with timeout

Is there a way to do something like a (Thread/sleep millis my-atom) that would wake up in case my-atom got changed before millis?

Or do I have to go clojure.core.async for that, using channels instead of watches?

Upvotes: 0

Views: 941

Answers (3)

Taylor Wood
Taylor Wood

Reputation: 16194

You could do this with a promise, using deref with a timeout:

(def p (promise))
(future ;; some other thread starts working with the promise
  (Thread/sleep 500)
  (deliver p :finished-early))
(deref p 1000 :timed-out) ;; => :finished-early

If the sleep took longer than 1000 then deref would return :timed-out.

Update: I see your question is now more specifically about atoms. In that case, you could still use a promise by adding a watch on the atom and delivering on the promise if the value changes:

(def p (promise))
(def a (atom {}))
(add-watch a :watch-changed
           (fn [_ _ old new]
             (when-not (= old new) (deliver p :changed))))
(future
  (Thread/sleep 1001)
  (swap! a assoc :foo 1))
(deref p 1000 :timed-out) ;; => :changed

Or in reusable function form, r can be any IRef type:

(defn await-change [r timeout-ms]
  (let [p (promise)]
    (try
      (add-watch r :await-change ;; keyword must be unique per ref!
                 (fn [_ _ old new]
                   (when-not (= old new) (deliver p :changed))))
      (deref p timeout-ms :timed-out)
      (finally
        (remove-watch r :await-change)))))

(def a (atom {}))
(future
  (Thread/sleep 500)
  (swap! a assoc :foo 1))
(await-change a 1000) ;; => :changed

Upvotes: 6

Alan Thompson
Alan Thompson

Reputation: 29958

While the solution using a promise is nice, a promise can be set only once and is thereafter immutable. So, it is not a drop-in replacement for an atom in your problem.

What might be a better solution is to stick with an atom and then use the add-watch function to assign a watcher who will be notified whenever the value of the atom changes:

(def a (atom {}))

(add-watch a :watcher
  (fn [key atom old-state new-state]
    (prn "-- Atom Changed --")
    (prn "key" key)
    (prn "atom" atom)
    (prn "old-state" old-state)
    (prn "new-state" new-state)))

(reset! a {:foo "bar"})

with results:

"-- Atom Changed --"
"key"        :watcher
"atom"       #<Atom@4b020acf: {:foo "bar"}>
"old-state"  {}
"new-state"  {:foo "bar"} 

{:foo "bar"}

Your code would need to be re-architected a bit, so instead of calling deref with a timeout, you would register your code as a callback function. You could use a future or core/async to take action if the atom is not changed before the desired timeout.

Upvotes: 0

Evgeniy Berezovsky
Evgeniy Berezovsky

Reputation: 19228

Here's a generic solution that repeatedly sleeps for a short interval and calls break-f in-between the sleeps to check if it should wake up and return.

In case of an atom, it can e.g. be used like (interruptible-sleep 100 #(not @my-atom))

Polling isn't that nice of a concept, but it sometimes suffices:

(defn interruptible-sleep
  ([millis break-f] (interruptible-sleep millis break-f 100))
  ([millis break-f polling-millis]
   (if (> millis 0)
     (let [end (+ (System/currentTimeMillis) millis)]
       (loop []
         (let [now (System/currentTimeMillis)
               remaining-millis (- end now)]
           (if (and (> remaining-millis 0) (not (break-f)))
             (do
               (if (>= remaining-millis 5)
                 (Thread/sleep (min polling-millis remaining-millis)))
               (recur))
             (- remaining-millis))))))))

Upvotes: 0

Related Questions