dtg
dtg

Reputation: 1853

Clojure asynchronous access to shared resource

I have a TCP socket connection in which I need to process n requests and conj each request and it's corresponding response to a vector for logging purposes. I need to transmit and receive via two asynchronous threads, where a transmit function is responsible for sending requests and a receive function is responsible for receiving responses from a server.

My understanding is that for asynchronous transmission, I need to use agents in Clojure to accomplish this. However, I also need to ensure serial access to the vector, since both threads are trying to modify it at any given time.

I tried to get something working, but my agent ends up in a failed state after making a few requests and processing a few responses.

Below is the code showing what I am attempting to do. If anyone could give me some guidance, it would be greatly appreciated.

;; the shared resource

(def async-log (agent []))

;; I thought this needed to be synchronized for serial access, so I used 
;; dosync, but I am not sure if this is right. In any case, it doesn't 
;; seem to make a difference

(defn add-entry
  [coll entry]
  (dosync (conj coll entry)))

;; transmit function

(defn transmit
  [log writer socket request]
  (let [request   (request->String request socket)
        bytes-out (request->bytes request)
        length    (count bytes-out)]
    (.writeShort writer length)
    (.write writer bytes-out 0 length)
    (add-entry log request)))

;; Receive function

(defn receive
  [log reader socket]
  (let [length   (read-length reader)
        bytes-in (byte-array request/max-message-size)]
    (.read reader bytes-in 0 length)
    (add-entry log (to-string bytes-in))))

;; process each request, n times

(defn process-requests
  [request socket iters]
  (with-open [reader (DataInputStream. (.getInputStream socket))
              writer (DataOutputStream. (.getOutputStream socket))]
    (dotimes [x iters]
      (send-off async-log transmit writer socket request)
      (send-off async-log receive reader socket)
      (Thread/sleep 50))))

Upvotes: 0

Views: 123

Answers (1)

noisesmith
noisesmith

Reputation: 20194

You are correct that your dosync is not required there.

If the problem is that your agent is entering an error state, then you should be using the following functions:

agent-error to investigate errors on an agent.

restart-agent to reset an agent's error state so that it can run again.

set-error-handler! to define the agent's behavior when it encounters errors.

Upvotes: 1

Related Questions