Krzysztof Skowronek
Krzysztof Skowronek

Reputation: 2936

How to do proper Producer-Consumer pattern with RX

How do I do producers-consumers with RX?

I have found this answer, it uses Java's scheduler that limits the number of concurent OnNext calls on the observer - so I could do something like that in C#, but I am sure that there is a another way to do that. Is there?

Also, I found it weird that standard Subject does not wait for OnNext to end, it just fires immediately.

Any thoughts?

Upvotes: 0

Views: 2467

Answers (1)

Enigmativity
Enigmativity

Reputation: 117027

This is all about the scheduler now, but historically subjects used to enforce the serial calls to .OnNext(...) - the issue was that if the subject included thread synchronization code that the performance in many non-threaded situations suffered. So subjects where many more efficient, but that allowed concurrent calls to .OnNext(...).

However, you can fix this with some appropriate uses of schedulers or publish calls.

Let's look at some code, starting with these methods:

public void Foo()
{
    Console.WriteLine("Foo Start");
    Thread.Sleep(5000);
    Console.WriteLine("Foo End");
}

public void Bar()
{
    Console.WriteLine("Bar Start");
    Thread.Sleep(5000);
    Console.WriteLine("Bar End");
}

If I now write this:

var subject = new Subject<Unit>();

subject.Subscribe(u => Foo());
subject.Subscribe(u => Bar());

subject.OnNext(Unit.Default);
subject.OnNext(Unit.Default);

I get the following:

Foo Start
Foo End
Bar Start
Bar End
Foo Start
Foo End
Bar Start
Bar End

This code is running on the immediate scheduler - it has to wait for each .OnNext(...) call to complete before moving on.

With this code:

var subject = new Subject<Unit>();
var query = subject.ObserveOn(Scheduler.Default);

query.Subscribe(u => Foo());
query.Subscribe(u => Bar());

subject.OnNext(Unit.Default);
subject.OnNext(Unit.Default);

I now get:

Foo Start
Bar Start
Foo End
Foo Start
Bar End
Bar Start
Foo End
Bar End

The scheduler now is free to use the thread pool so it schedules calls concurrently.

Now, if we take the same code, but .Publish() the query it goes back to the first behaviour:

var subject = new Subject<Unit>();
var query = subject.ObserveOn(Scheduler.Default).Publish();

query.Subscribe(u => Foo());
query.Subscribe(u => Bar());

query.Connect();

subject.OnNext(Unit.Default);
subject.OnNext(Unit.Default);

And finally, if we go back to the original code, but schedule using an EventLoopScheduler then we're using a single background thread so the calls are in series again.

var loop = new EventLoopScheduler();
var subject = new Subject<Unit>();
var query = subject.ObserveOn(loop);

query.Subscribe(u => Foo());
query.Subscribe(u => Bar());

subject.OnNext(Unit.Default);
subject.OnNext(Unit.Default);

Upvotes: 2

Related Questions