nha
nha

Reputation: 18005

Clojure - core.async interface for apache kafka

I am using clj-kafka, and I am trying to make a core.async interface to it in the REPL.

I am getting some messages, but my structure feels wrong : I either cannot stop receiving messages, or have to launch the go routine again to receive more messages.

Here is my attempt :

(defn consume [topic]
  (let [consume-chan (chan)]
    (with-resource [c (consumer config)]
      shutdown
      (go (doseq [m (messages c "test")]
                   (>! chan message) ;; should I check the return value?
                   )))
    consume-chan)) ;; is it the right place to return a channel ?


  (def consume-chan (consume "test"))
  ;;(close! consume-chan)

  (go (>! consume-chan "hi")) ;; manual test, but I have some messages in Kafka already

  (def cons-ch (go
                 (with-resource [c (consumer config)]
                   shutdown
                   (doseq [m (messages c "test")]
                     (>! consume-chan m)))))  ;; should I check something here ?

  ;;(close! cons-ch)

  (def go-ch
    (go-loop []
      (if-let [km (<! consume-chan)]
        (do  (println "Got a value in this loop:" km)
              (recur))
        (do (println "Stop recurring - channel closed")))))

  ;;(close! go-ch)

How do I consume a lazy-sequence of messages with a core.async interface ?

Upvotes: 0

Views: 521

Answers (1)

bsvingen
bsvingen

Reputation: 2759

Here's what I would do:

  • >! and <! return nil if the channel is closed, so make sure that the loop exits when this happens - that way you can easily end the loop from the outside by closing the channel.

  • Use a try/catch to check for exceptions inside the go block, and make any exception the return value so that they don't get lost.

  • Check for exceptions on read values, to catch anything from inside the channel.

  • The go blocks return a channel, and the return value of the code inside the block (like the exceptions from above) will be put on the channel. Check these channels for exceptions, possibly to rethrow.

You can now write to a channel like this:

(defn write-seq-to-channel
  [channel
   values-seq]
  (a/go
    (try
      (loop [values values-seq]
        (when (seq values)
          (when (a/>! channel (first values))
            (recur (rest values)))))
      (catch Throwable e
        e))))

and you read like this:

(defn read-from-channel-and-print
  [channel]
  (a/go
    (try
      (loop []
        (let [value (a/<! channel)]
          (when value
            (when (instance? Throwable value)
              (throw value))
            (println "Value read:" value)
            (recur))))
      (catch Throwable e
        e))))

You will now have two channels, so use something like alts! or alts!! to check for your loops exiting. Close the channel when you are done.

Upvotes: 1

Related Questions