Ashish Bhatia
Ashish Bhatia

Reputation: 191

.NET - Multi threaded Producer Consumer using .NET reactive

I am working to build a producer..consumer pattern using .NET reactive. Producer reads messages from Kafka message bus. Once the message is read, it needs to be handed over to the consumer to process the message.

I was able to do that using .NET reactive. However, I noticed that consumer is processing the message on the same thread as producer. Please see the code below. Objective is to: Have a single producer thread that reads messages from the bus. And, then hand it over to consumers on a separate thread to process the messages. The code I have is:

 // Producer Code
 private Subject<LGMessage> _onMessageSubject = new Subject<LGMessage>();

 private IObserver<LGMessage> messageBusObserver;

 protected IObservable<LGMessage> _onMessageObservable
    {
        get
        {
            return _onMessageSubject.AsObservable();
        }
    }


public void AddObserver(IObserver<LGMessage> observer)
    {
       _onMessageObservable.ObserveOn(NewThreadScheduler.Default).Subscribe(observer);


    }


// Read is called when the message is read from the bus
public bool Read(Message<string, string> msg)
    {

            // add the message to the observable
            _onMessageSubject.OnNext(msg.Value);


    }

// Consumer Code
public virtual void OnNext(string value)
    {
        Console.WriteLine("Thread {0} Consuming",          

        Thread.CurrentThread.ManagedThreadId);

        Console.WriteLine("{1}: Message I got from bus: {0}", value.Key, 
         this.Name);
        // Take Action
    }

Upvotes: 3

Views: 1265

Answers (1)

Shlomo
Shlomo

Reputation: 14350

It's hard to tell from your code, but it looks like you aren't exposing the observable. This denies downstream usage of Rx operators. In your case, you would like to use the threading operators.

In the producer, instead of exposing AddObserver(IObserver<string> observer), I would expose something like this:

public IObservable<string> Messages => _onMessageSubject.AsObservable();

A consumer can then do something like

Messages
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(consumerObserver);

EDIT:

The following code works for me as intended:

var subject = new Subject<int>();

var observer1 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer1: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));
var observer2 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer2: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));
var observer3 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer3: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));

subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer3);

subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnCompleted();

Here's the output (Observer1 got thread 14, Observer2 got thread 15, Observer3 got thread 16):

Observer1: Observed 1 on thread 14.
Observer2: Observed 1 on thread 15.
Observer1: Observed 2 on thread 14.
Observer1: Observed 3 on thread 14.
Observer2: Observed 2 on thread 15.
Observer2: Observed 3 on thread 15.
Observer3: Observed 1 on thread 16.
Observer3: Observed 2 on thread 16.
Observer3: Observed 3 on thread 16.

Upvotes: 2

Related Questions