Reputation: 21
I have a use case to indexing data from kafka into elasticsearch. Elasticsearch has a bulk API, which bundles multiple messages together in one API call for indexing. It is much more efficient than doing indexing one by one on each message.
I am using Clojure to do this. naturally, I would like to use core.async, consuming messages from kafka and input into a channel, buffered for some time or when the channel has enough messages before indexing them in one elasticsearch bulk API call.
The problem is the output of the channel will be signaled as long as there is one message coming in.
One simple solution is buffering the incoming messages from the output of the channel in a blocking queue, but that sounds like to defeat the purpose of using channel. why not just using a blocking queue without channel?
Any suggestion to use channel for batch processing ? thanks.
Upvotes: 1
Views: 1929
Reputation: 144
Blocking queue need to be written and it requires proper synchronization, which already done for channels in runtime. In you case I'd go with make(chan []*Message, buffer), so you synchronously in loop collecting messages to slice until it reaches necessary length or when timeout is reached and then send it to process in channel. That will be pretty simple and doesn't require any complicated datastructures.
Upvotes: 0