Reputation: 406
I'm working with Kafka and trying to implement a single consumer which subscribe to all wanted topics. Let's say I have 3 topics (A, B, C). How to process messages from each topic synchronously, but between topics in parallel. So, for A topic I need process messages one by one, but at the same time I need process messages one by one from other topics.
It looks like I need separate thread for each topic. Could you please advice how it can be implemented? Are there some ready solutions for this? My consumer looks like
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);
... processing here
_consumer.Commit(consumeResult);
}
I don't have any idea how to implement it because no experience with asnyc in C# yet. Found thing like Reactive Extensions https://gist.github.com/omnibs/6b2cbdba2685693448ee6779736a00c2.
When I receive a message, how to redirect message to the corresponding working thread/Task?
Using Confluent.Kafka 1.3.0 package to work with Kafka
Upvotes: 1
Views: 106
Reputation: 24579
var channels = new Dictionary<string, ActionBlock<ConsumeResult<string, string>>>();
foreach (var topic in _consumer.Subscription)
{
channels.Add(topic, new ActionBlock<ConsumeResult<string, string>>(async consumeResult =>
{
... processing here
_consumer.Commit(consumeResult);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }));
}
this code will create ActionBlock for all topics that you subscribe.
then when you receive a message, redirect it to the corresponding channel
while (!cancellationToken.IsCancellationRequested)
{
ConsumeResult<string, string> consumeResult = _consumer.Consume();
await channels[consumeResult.Topic].SendAsync(consumeResult);
}
Read articles from @PauloMorgado answer
Upvotes: 1
Reputation: 14846
Have a look at TPL DataFlow, namely the ActionBlock or Channels.
Upvotes: 1
Reputation: 62
You should create 1 obserable for each topic onto which to publish messages. Then subscribe to each available topic on the observer.
Create a IObservableList from a list of IObservable
Upvotes: 0