anivas
anivas

Reputation: 6547

Converting listener for a stream to IObservable

My service method takes a listener and calls it with stream of data. I am trying to convert this stream to a IObservable<T>. So far this is what I have done:

public class MessageListener : IMessageListener
{

    private readonly Subject<string> stream = new Subject<string>();

    public IObservable<string> MessageStream
    {
        get
        {
            return this.stream;
        }
    }

    public void OnMessageAdded(string message)
    {
        this.stream.OnNext(message);
    }

}

//Calling code    
public IObservable<string> GetMessage()
{
     var listener = new MessageListener();
     service.Subscribe(listener);
     listener.MessageStream.SubscribeOn(Scheduler.NewThread);
}

I am not sure if this is good enough. I beleive the call to SubscribeOn will run only the subscription code on new thread. How will I make sure OnMessageAdded is received by a new thread ?

Upvotes: 1

Views: 241

Answers (1)

Christoph
Christoph

Reputation: 27985

As you already spotted yourself, SubscribeOn will only control where the subscription is going to happen. However, in your code the SubscribeOn has no effect at all because it's not followed by a subscription. Remember that SubscribeOn returns an Observable to be used for the subscription. It's not that you can call "SubscribeOn" to set some global flag inside the source or something like that.

What you actually want to do is to call "ObserveOn" inside your service right before the subscription takes place. ObserveOn defines the thread of processing the incomming messages. Another option would be to write the ObserveOn directly into the MessageListener so that it looks like this:

public IObservable<string> MessageStream
{
    get
    {
        return this.stream.ObserveOn(Scheduler.ThreadPool).AsObservable();
    }
}

Another thing to be noted, in your MessageStream Property better call this.stream.AsObservable() instead of directly returning the subject.

Upvotes: 1

Related Questions