Я TChebur
Я TChebur

Reputation: 406

Redirect message to the corresponding working thread/Task

I'm working with Kafka and trying to implement a single consumer which subscribe to all wanted topics. Let's say I have 3 topics (A, B, C). How to process messages from each topic synchronously, but between topics in parallel. So, for A topic I need process messages one by one, but at the same time I need process messages one by one from other topics.

It looks like I need separate thread for each topic. Could you please advice how it can be implemented? Are there some ready solutions for this? My consumer looks like

while (!cancellationToken.IsCancellationRequested)
{
    ConsumeResult<string, string> consumeResult = _consumer.Consume(cancellationToken);

    ... processing here

    _consumer.Commit(consumeResult);
}

I don't have any idea how to implement it because no experience with asnyc in C# yet. Found thing like Reactive Extensions https://gist.github.com/omnibs/6b2cbdba2685693448ee6779736a00c2.

When I receive a message, how to redirect message to the corresponding working thread/Task?

Using Confluent.Kafka 1.3.0 package to work with Kafka

Upvotes: 1

Views: 106

Answers (3)

Roman Marusyk
Roman Marusyk

Reputation: 24579

var channels = new Dictionary<string, ActionBlock<ConsumeResult<string, string>>>();
foreach (var topic in _consumer.Subscription)
{
    channels.Add(topic, new ActionBlock<ConsumeResult<string, string>>(async consumeResult =>
    {
        ... processing here
        _consumer.Commit(consumeResult);
    }, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 1 }));
}

this code will create ActionBlock for all topics that you subscribe.

then when you receive a message, redirect it to the corresponding channel

while (!cancellationToken.IsCancellationRequested)
{
   ConsumeResult<string, string> consumeResult = _consumer.Consume();
   await channels[consumeResult.Topic].SendAsync(consumeResult);
}

Read articles from @PauloMorgado answer

Upvotes: 1

Paulo Morgado
Paulo Morgado

Reputation: 14846

Have a look at TPL DataFlow, namely the ActionBlock or Channels.

Upvotes: 1

symaps
symaps

Reputation: 62

You should create 1 obserable for each topic onto which to publish messages. Then subscribe to each available topic on the observer.

Create a IObservableList from a list of IObservable

Upvotes: 0

Related Questions