metasoarous
metasoarous

Reputation: 2943

Agent/actor like constructs in clojure that operate on all messages received since last update

What's best way in clojure to implement something like an actor or agent (asynchronously updated, uncoordinated reference) that does the following?

An agent doesn't seem quite right here. One must simultaneously send function and data to agents, which doesn't leave room for a function which operates on all data that has come in during the last update. The goal implicitly requires a decoupling of function and data.

The actor model seems generally better suited in that there is a decoupling of function and data. However, all actor frameworks I'm aware of seem to assume each message sent will be processed separately. It's not clear how one would turn this on it's head without adding extra machinery. I know Pulsar's actors accept a :lifecycle-handle function which can be used to make actors do "special tricks" but there isn't a lot of documentation around this so it's unclear whether the functionality would be helpful.

I do have a solution to this problem using agents, core.async channels, and watch functions, but it's a bit messy, and I'm hoping there is a better solution. I'll post it as a solution in case others find it helpful, but I'd like to see what other's come up with.

Upvotes: 1

Views: 297

Answers (3)

metasoarous
metasoarous

Reputation: 2943

I came up with something closer to an actor, inspired by Tim Baldridge's cast on actors (Episode 16). I think this addresses the problem much more cleanly.

(defmacro take-all! [c]
  `(loop [acc# []]
     (let [[v# ~c] (alts! [~c] :default nil)]
       (if (not= ~c :default)
         (recur (conj acc# v#))
         acc#))))


(defn eager-actor [f]
  (let [msgbox (chan 1024)]
    (go (loop [f f]
          (let [first-msg (<! msgbox) ; do this so we park efficiently, and only
                                      ; run when there are actually messages
                msgs      (take-all! msgbox)
                msgs      (concat [first-msg] msgs)]
            (recur (f msgs)))))
    msgbox))


(let [a (eager-actor (fn f [ms]
                       (Thread/sleep 1000) ; simulate work
                       (println "doing something with" ms)
                       f))]
  (doseq [i (range 20)]
    (Thread/sleep 300)
    (put! a i)))
;; =>
;; doing something with (0)
;; doing something with (1 2 3)
;; doing something with (4 5 6)
;; doing something with (7 8 9 10)
;; doing something with (11 12 13)

Upvotes: 0

noisesmith
noisesmith

Reputation: 20194

Agents are the inverse of what you want here - they are a value that gets sent updating functions. This easiest with a queue and a Thread. For convenience I am using future to construct the thread.

user> (def q (java.util.concurrent.LinkedBlockingDeque.)) 
#'user/q
user> (defn accumulate
        [summary input]
        (let [{vowels true consonents false}
              (group-by #(contains? (set "aeiouAEIOU") %) input)]
          (-> summary
            (update-in [:vowels] + (count vowels))
            (update-in [:consonents] + (count consonents)))))
#'user/accumulate
user> (def worker
           (future (loop [summary {:vowels 0 :consonents 0} in-string (.take q)]
                         (if (not in-string)
                             summary
                           (recur (accumulate summary in-string)
                                  (.take q))))))
#'user/worker
user> (.add q "hello")
true
user> (.add q "goodbye")
true
user> (.add q false)
true
user> @worker
{:vowels 5, :consonents 7}

Upvotes: 1

metasoarous
metasoarous

Reputation: 2943

Here's the solution I came up with using agents, core.async channels, and watch functions. Again, it's a bit messy, but it does what I need it to for now. Here it is, in broad strokes:

(require '[clojure.core.async :as async :refer [>!! <!! >! <! chan go]])

; We'll call this thing a queued-agent
(defprotocol IQueuedAgent
  (enqueue [this message])
  (ping [this]))

(defrecord QueuedAgent [agent queue]
  IQueuedAgent
  (enqueue [_ message]
    (go (>! queue message)))
  (ping [_]
    (send agent identity)))


; Need a function for draining a core async channel of all messages
(defn drain! [c]
  (let [cc (chan 1)]
    (go (>! cc ::queue-empty))
    (letfn
      ; This fn does all the hard work, but closes over cc to avoid reconstruction
      [(drainer! [c]
         (let [[v _] (<!! (go (async/alts! [c cc] :priority true)))]
           (if (= v ::queue-empty)
             (lazy-seq [])
             (lazy-seq (cons v (drainer! c))))))]
      (drainer! c))))

; Constructor function
(defn queued-agent [& {:keys [buffer update-fn init-fn error-handler-builder] :or {:buffer 100}}]
  (let [q                (chan buffer)
        a                (agent (if init-fn (init-fn) {}))
        error-handler-fn (error-handler-builder q a)]
    ; Set up the queue, and watcher which runs the update function when there is new data
    (add-watch
      a
      :update-conv
      (fn [k r o n]
        (let [queued (drain! q)]
          (when-not (empty? queued)
            (send a update-fn queued error-handler-fn)))))
    (QueuedAgent. a q)))

; Now we can use these like this

(def a (queued-agent
         :init-fn   (fn [] {:some "initial value"})
         :update-fn (fn [a queued-data error-handler-fn]
                      (println "Receiving data" queued-data)
                      ; Simulate some work/load on data
                      (Thread/sleep 2000)
                      (println "Done with work; ready to queue more up!"))
         ; This is a little warty at the moment, but closing over the queue and agent lets you requeue work on
         ; failure so you can try again.
         :error-handler-builder
                    (fn [q a] (println "do something with errors"))))

(defn -main []
  (doseq [i (range 10)]
    (enqueue a (str "data" i))
    (Thread/sleep 500) ; simulate things happening
    ; This part stinks... have to manually let the queued agent know that we've queued some things up for it
    (ping a)))

As you'll notice, having to ping the queued-agent here every time new data is added is pretty warty. It definitely feels like things are being twisted out of typical usage.

Upvotes: 1

Related Questions