Christophe De Troyer
Christophe De Troyer

Reputation: 2922

Clojure wait for condition without spinning

I'm implementing a mechanism for a thread to have a queue that contains messages. The queue is built using LinkedBlockingQueue from java.util.concurrent. What I want to achieve is something like the following.

Thread with mailbox:
defn work:
    * do some stuff
    * Get the head of the queue (a message):
        - if it is "hello":
             <do some stuff>
             <recur work fn>
        - if it is "bye":
             <do some stuff>
        - if it is none of the above, add the message to the back of queue
          and restart from "Get the head of the queue"
    * <reaching this point implies terminating the thread>

My first idea that I tried to implement was using a loop wrapped around the * Get the head of the queue, use a conditional to check the message and add it to the queue in an :else branch if it did not match any of the clauses. The downside of this was that calling recur in any of the bodies of the clauses of the cond would always recur the loop, while using recur (e.g., like in the hello case) means recur the function (i.e., work). So that was not an option. An other downside would be that, in case it would take a long time for such message to arrive the thread would indefinitely spin and eat resources.

The next idea I had (but have not yet implemented) is using a future. The scheme would be as follows.

* Get all the matches I have to match (i.e., "hello" and "bye")
* Start a future and pass it the list of messages:
    * While the queue does not contain any of the messages
      recur
    * when found, return the first element that matches.
* Wait for the future to deliver.
* if it is "hello":
    <do some stuff>
    <recur work fn>
  if it is "bye":
    <do some stuff>

When doing it this way I get almost what I want:

  1. Receiving either "hello" or "bye" blocks until I have either one.
  2. I can make an indefinite number of clauses to match the message
  3. I have extracted the looping behaviour into a future that blocks, which has the nice side-effect that each time I evaluate my cond I'm sure I have a matching message and don't have to worry about retrying.

One thing I really would like, but can't imagine how to achieve, is that the future in this case does not spin. As it stands it would keep eating up precious CPU resources traversing the queue indefinitely, while it might be perfectly normal to never receive one of the messages it is looking for.

Perhaps it would make sense to abandon the LinkedBlockedQueue and trade it in for a data structure that has a method, say, getEither(List<E> oneOfThese) that blocks until one of these elements in available.

An other thought I had, which is a way I could possibly do it in Java, is having the aforementioned getEither() operation on the queue that calls wait() if none of the elements are in the queue. When an other thread puts a message in the queue I can call notify() so that each thread will check the queue against his list of wanted messages.

Example

The code below works fine. However, it has the spinning problem. It's basicly a very elementary example of what I'm trying to achieve.

(def queue (ref '()))

(defn contains-element [elements collection]
  (some (zipmap elements (repeat true)) collection))

(defn has-element
  [col e]
  (some #(= e %) col))

(defn find-first
         [f coll]
         (first (filter f coll)))

; This function is blocking, which is what I want.
; However, it spins and thus used a LOT of cpu,
; whit is *not* what I want..
(defn get-either
  [getthese queue]
  (dosync
    (let [match   (first (filter #(has-element getthese %) @queue))
          newlist (filter #(not= match %)  @queue)]

      (if (not (nil? match))
        (do (ref-set queue newlist)
            match)
        (Thread/sleep 500)
        (recur)))))

(defn somethread
  [iwantthese]
  (let [element (get-either iwantthese queue)
        wanted  (filter #(not= % element) iwantthese)]
    (println (str "I got " element))
    (Thread/sleep 500)
    (recur wanted)))

(defn test
  []
  (.start (Thread. (fn [] (somethread '(3 4 5)))))

  (dosync (alter queue #(cons 1 %)))
  (println "Main: added 1")
  (Thread/sleep 1000)

  (dosync (alter queue #(cons 2 %)))
  (println "Main: added 2")
  (Thread/sleep 1000)

  (dosync (alter queue #(cons 3 %)))
  (println "Main: added 3")
  (Thread/sleep 1000)

  (dosync (alter queue #(cons 4 %)))
  (println "Main: added 4")
  (Thread/sleep 1000)

  (dosync (alter queue #(cons 5 %)))
  (println "Main: added 5")        
  )

Any tips?

(In case anyone noticed, yes, this is like actors and the purpose is an implementation in Clojure for academic purposes)

Upvotes: 1

Views: 1192

Answers (2)

DanLebrero
DanLebrero

Reputation: 8593

You need 2 queues instead of one: incoming queue and a "dead-letter" queue.

  1. A "thread" should read from the incoming queue in a blocking way ( LinkedBlockingQueue.take(), core.async/<! or using agents).
  2. If message doesn't match any clause:
    1. Put message on end of dead queue
    2. Go to 1.
  3. If message matches a clause:
    1. Run the clause work
    2. For each message in the dead queue, match against clauses, removing the ones that are matched.
    3. go to 1.

See below for the two implementations.

Agents

Agents are quite similar to actors, the "only" difference is that you send data/messages to actors but you send functions to agents. A possible implementation would be:

(defn create-actor [behaviour]
  (agent {:dead-queue [] 
          :behaviour behaviour}))

dead-queue will contain messages that didn't match any of the clauses. This is basically your "end of the queue". behaviour should be some map/vector of match-fn to fn to run. In my particular implementation, I have chosen a map, where keys are the element to match and values are the fn to run when the new item matches:

(def actor (create-actor {3 println
                          4 (partial println "Got a ")
                          5 #(println "Got a " %)}))

You will probably require a more sophisticated behaviour data structure. The only thing important is to know if the element was processed or not, so you know if the element has to go to the dead queue or not.

To send messages to the actor:

(defn push [actor message]
  (send actor
        (fn [state new-message]
          (if-let [f (get-in state [:behaviour new-message])]
            (do
              (f new-message)
              state)
            (update-in state [:dead-queue] conj new-message)))
        message))

So if there is a match on the behaviour, the message is processed immediately. If not, it is stored in the dead queue. You could in try to match/process all the messages in the dead queue after processing the new message if you expected that the behaviours are not pure functions. In this example implementation this is not possible.

We could change the behaviour of the actor to give the messages on the dead queue a chance to be processed:

(defn change-behaviour [actor behaviour]
  (send actor
        (fn [state new-behaviour]
          (let [to-process (filter new-behaviour (:dead-queue state))
                new-dead-queue (vec (remove (set to-process) (:dead-queue state)))]
            (doseq [old-message to-process
                    :let [f (get new-behaviour old-message)]]
              (f old-message))
            {:behaviour new-behaviour
             :dead-queue new-dead-queue}))
        conds))

And an example of using it:

(push actor 4)
(push actor 18)
(push actor 1)
(push actor 18)
(push actor 5)
(change-behaviour actor {18 (partial println "There was an")})

And the same solution based on core.async:

(defn create-actor [behaviour]
  (let [queue (async/chan)]
    (async/go-loop [dead-queue []
                    behaviour behaviour]
    (let [[type val] (async/<! queue)]
      (if (= type :data)
        (if-let [f (get behaviour val)]
          (do
            (f val)
            (recur dead-queue behaviour))
          (recur (conj dead-queue val) behaviour))
        (let [to-process (filter val dead-queue)
              new-dead-queue (vec (remove (set to-process) dead-queue))]
          (doseq [old-msg to-process
                  :let [f (get val old-msg)]]
            (f old-msg))
          (recur new-dead-queue val)))))
  queue))

(defn push [actor message]
  (async/go
    (async/>! actor [:data message])))

(defn change-behaviour [actor behaviour]
  (async/go
    (async/>! actor [:behaviour behaviour])))

Upvotes: 1

Symfrog
Symfrog

Reputation: 3418

Have you considered using core.async? It provides what you need in a lightweight way.

Upvotes: 0

Related Questions