Performance
Performance

Reputation: 43

How can I make my implementation of IObservable<T> multithreaded?

I wrote an implementation based on the examples at [ Rx DEVHOL202] and http : //rxwiki.wikidot.com/101samples#toc48

Here is my code. http://csharp.pastebin.com/pm2NAPx6

  1. It works, but the calls to OnNext are not NonBlocking, which is what i would like to implement to simulate a network read and asynchronously handing off each chunk of bytes as it is read to a handler [ which is not shown here in full but might cache results and do further processing ].

    What is a good way of doing that?

  2. Once the Exception gets thrown, all the subsequent OnNext()s are not processed!! If I dont explicitly exit the loop and indicate completion. Why is this so?

Upvotes: 2

Views: 1252

Answers (2)

Richard Szalay
Richard Szalay

Reputation: 84734

I would strongly recommend against trying to implement your own IObservable. The implicit rules go beyond thread safety and into method call ordering.

Usually you would return IObservable instances from methods, but if you need a class that implements it directly, you should wrap a Subject:

public class SomeObservable<T> : IObservable<T>
{
        private Subject<T> subject = new Subject<T>();

        public IDisposable Subscribe(IObserver<T> observer)
        {
            return subject.Subscribe(observer);
        }
}

1. You need to be careful about how you support this from your observer (as you may have shared data), but you can make your handling of the calls asynchronous in one of two ways:

  • Call ObserveOn(Scheduler.TaskPool) (or ThreadPool if you are pre-4.0) before you call Subscribe. This causes messages to be routed through a scheduler (in this case, a task)
  • Pass the IScheduler to the Observer constructor
  • Start an asynchronous task/thread from your subscriber

2. This is the expected functionality. The contract between IObservable and IObserver is (OnNext)* (OnCompleted | OnError)?, which is to say "zero or more calls to OnNext, optionally followed by EITHER OnCompleted or OnError". After OnCompleted|OnError, it is invalid to call OnNext.

All the operators in Rx (Where, Select, etc) enforce this rule, even if the source doesn't.

Upvotes: 2

Kiril
Kiril

Reputation: 40345

I'm not sure if I understand your question correctly, but why can't you just execute whatever logic you have on a different Thread, or if it's small enough push it on a ThreadPool?

Here is an example:

ThreadPool.QueueUserWorkItem(o=>
{
    _paidSubj.OnNext(this); // Raise PAID event 
});

I'm confused about the data type on Subject, I have never seen that class in C#... is it something that you created? Is OnNext an event that gets raised or is it just a method? If OnNext is an event, then you can use BeginInvoke to invoke it asynchronously:

_paidSubj.OnNext.BeginInvoke(this, null, null);

Update:

An important thing that will happen if you implement this kind of asynchronous behavior: if you notify an IObserver by passing the Order, you might actually have some inconsistencies when you try to read the data in the observer (namely the order buffer) while the Order continues to modify the buffer in its Read thread. So there are at least two ways to solve this:

  1. Restrict access to the memory which will get modified by using locks.
  2. Only notify the observer with the relevant information that you want it to see:
    a. By passing the information as a value (not as a reference).
    b. By creating an immutable structure that transmits the information.

P.S. Where did you get Subject from? Is Subject supposed to be an OrderObserver?

Upvotes: 0

Related Questions