Reputation: 2936
How do I do producers-consumers with RX?
I have found this answer, it uses Java's scheduler that limits the number of concurent OnNext calls on the observer - so I could do something like that in C#, but I am sure that there is a another way to do that. Is there?
Also, I found it weird that standard Subject does not wait for OnNext to end, it just fires immediately.
Any thoughts?
Upvotes: 0
Views: 2467
Reputation: 117027
This is all about the scheduler now, but historically subjects used to enforce the serial calls to .OnNext(...)
- the issue was that if the subject included thread synchronization code that the performance in many non-threaded situations suffered. So subjects where many more efficient, but that allowed concurrent calls to .OnNext(...)
.
However, you can fix this with some appropriate uses of schedulers or publish calls.
Let's look at some code, starting with these methods:
public void Foo()
{
Console.WriteLine("Foo Start");
Thread.Sleep(5000);
Console.WriteLine("Foo End");
}
public void Bar()
{
Console.WriteLine("Bar Start");
Thread.Sleep(5000);
Console.WriteLine("Bar End");
}
If I now write this:
var subject = new Subject<Unit>();
subject.Subscribe(u => Foo());
subject.Subscribe(u => Bar());
subject.OnNext(Unit.Default);
subject.OnNext(Unit.Default);
I get the following:
Foo Start Foo End Bar Start Bar End Foo Start Foo End Bar Start Bar End
This code is running on the immediate scheduler - it has to wait for each .OnNext(...)
call to complete before moving on.
With this code:
var subject = new Subject<Unit>();
var query = subject.ObserveOn(Scheduler.Default);
query.Subscribe(u => Foo());
query.Subscribe(u => Bar());
subject.OnNext(Unit.Default);
subject.OnNext(Unit.Default);
I now get:
Foo Start Bar Start Foo End Foo Start Bar End Bar Start Foo End Bar End
The scheduler now is free to use the thread pool so it schedules calls concurrently.
Now, if we take the same code, but .Publish()
the query it goes back to the first behaviour:
var subject = new Subject<Unit>();
var query = subject.ObserveOn(Scheduler.Default).Publish();
query.Subscribe(u => Foo());
query.Subscribe(u => Bar());
query.Connect();
subject.OnNext(Unit.Default);
subject.OnNext(Unit.Default);
And finally, if we go back to the original code, but schedule using an EventLoopScheduler
then we're using a single background thread so the calls are in series again.
var loop = new EventLoopScheduler();
var subject = new Subject<Unit>();
var query = subject.ObserveOn(loop);
query.Subscribe(u => Foo());
query.Subscribe(u => Bar());
subject.OnNext(Unit.Default);
subject.OnNext(Unit.Default);
Upvotes: 2