Reputation: 43
I wrote an implementation based on the examples at [ Rx DEVHOL202] and http : //rxwiki.wikidot.com/101samples#toc48
Here is my code. http://csharp.pastebin.com/pm2NAPx6
It works, but the calls to OnNext are not NonBlocking, which is what i would like to implement to simulate a network read and asynchronously handing off each chunk of bytes as it is read to a handler [ which is not shown here in full but might cache results and do further processing ].
What is a good way of doing that?
Once the Exception gets thrown, all the subsequent OnNext()s are not processed!! If I dont explicitly exit the loop and indicate completion. Why is this so?
Upvotes: 2
Views: 1252
Reputation: 84734
I would strongly recommend against trying to implement your own IObservable
. The implicit rules go beyond thread safety and into method call ordering.
Usually you would return IObservable
instances from methods, but if you need a class that implements it directly, you should wrap a Subject
:
public class SomeObservable<T> : IObservable<T>
{
private Subject<T> subject = new Subject<T>();
public IDisposable Subscribe(IObserver<T> observer)
{
return subject.Subscribe(observer);
}
}
1. You need to be careful about how you support this from your observer (as you may have shared data), but you can make your handling of the calls asynchronous in one of two ways:
ObserveOn(Scheduler.TaskPool)
(or ThreadPool
if you are pre-4.0) before you call Subscribe
. This causes messages to be routed through a scheduler (in this case, a task)IScheduler
to the Observer
constructor2. This is the expected functionality. The contract between IObservable
and IObserver
is (OnNext)* (OnCompleted | OnError)?
, which is to say "zero or more calls to OnNext, optionally followed by EITHER OnCompleted or OnError". After OnCompleted|OnError
, it is invalid to call OnNext
.
All the operators in Rx (Where
, Select
, etc) enforce this rule, even if the source doesn't.
Upvotes: 2
Reputation: 40345
I'm not sure if I understand your question correctly, but why can't you just execute whatever logic you have on a different Thread
, or if it's small enough push it on a ThreadPool
?
Here is an example:
ThreadPool.QueueUserWorkItem(o=>
{
_paidSubj.OnNext(this); // Raise PAID event
});
I'm confused about the data type on Subject
, I have never seen that class in C#... is it something that you created? Is OnNext
an event that gets raised or is it just a method? If OnNext
is an event, then you can use BeginInvoke
to invoke it asynchronously:
_paidSubj.OnNext.BeginInvoke(this, null, null);
Update:
An important thing that will happen if you implement this kind of asynchronous behavior: if you notify an IObserver
by passing the Order
, you might actually have some inconsistencies when you try to read the data in the observer (namely the order buffer) while the Order continues to modify the buffer in its Read
thread. So there are at least two ways to solve this:
P.S.
Where did you get Subject
from? Is Subject
supposed to be an OrderObserver
?
Upvotes: 0