Reputation: 533
Context: Clojure + RabbitMQ (via Langohr), a follow up to this question.
I'm getting weird results in consuming messages from a RabbitMQ mq (getting the messages from a direct exchange and publishing to a fanout exchange after processing the message). I don't understand why messages end up in separate threads during consumption (every few messages a thread switch happens).
The consumer starts in a separate thread (to prevent it from crashing the main thread if any IO exceptions happen), but that doesn't explain the switching.
; Message handler
(defn message-handler
[pub-name ch metadata ^bytes payload]
(let [msg (json/parse-string (String. payload "UTF-8"))
content (string/join " " (map msg '("title" "link" "body")))
tags (pluck-tags content)]
(println (format "HANDLER %s: Message: %s | found tags: %s"
(Thread/currentThread)
(msg "title")
(tags-to-csv tags)))
(nil)))
; (lb/publish ch pub-name "" (json/generate-string (assoc msg "tags" (tags-to-csv tags))))))
(defn -main
[& args]
(let [conn (rmq/connect {:uri (System/getenv "MSGQ")})
ch (lch/open conn)
q-name "q.events.tagger"
e-sub-name "e.events.preproc"
e-pub-name "e.events"
routing-key "tasks.taggify"]
(lq/declare ch q-name :exclusive false :auto-delete false)
(le/declare ch e-pub-name "fanout" :durable false)
(lq/bind ch q-name e-sub-name :routing-key routing-key)
(.start (Thread. (fn []
(lcm/subscribe ch q-name (partial message-handler e-pub-name) :auto-ack true))))))
The message-handler just prints out the current thread and the payload received. This is what I get:
HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-2,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-3,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...
HANDLER in Thread[pool-1-thread-4,5,main]: Message: ...
NOTE
I noticed this during playing with agents. I wanted to process each message in its own CPU bound thread-pool, and publish it in a unbounded (IO) thread-pool. But, after printing out the current thread I noticed that even without using agents (or futures), messages get processed by different threads.
Upvotes: 0
Views: 711
Reputation: 2313
1) You have a fanout exchange there, that means the routing key is not used at all while routing messages. A fanout exchange routes messages to every queue bound to it. If you want to use routing keys, then either use direct or topic exchanges.
2) You always use the same queue name, that means what your code is doing is just adding multiple consumers to the same queue. That implies that rabbitmq will just round robin messages around your consumers.
Upvotes: 1
Reputation: 786
Author of Langohr here.
There must be something missing from the code. If you get this output with agents, that's easy: Clojure agents (also, futures and promises) use a thread pool. Langohr's langohr.consumers/subscribe or the underlying QueueingConsumer in the RabbitMQ Java client do not.
Upvotes: 0