Reputation: 3744
I have a helper class that saves text messages to the local file system. This method returns a Task
object, and is asynchronous by definition.
I want to be able to observe when this method gets called, so I can continuously monitor the size and length of the buffer and make a decision based on that.
I am trying to implement this using the Reactive Extension for .NET. However, I can't come up with a design that allows me to continuously listen to messages being added to the buffer. Below is my current implementation:
public IObservable<Unit> Receive(InternalMessage message)
{
var observable = FileBuffer.BufferMessage(message.MessageId.ToString(), message, DateTime.UtcNow).ToObservable(); //This returns a Task, which I convert into an Observable
return observable;
}
Here is how I subscribe to the observable:
IObservable<Unit> receiverObservable = batchHandler.Receive(message);
receiverObservable.Subscribe(
x => Console.WriteLine("On next"),
ex => //TODO,
() => // Completed);
I want the subscriber to be called every time the method Receive
is called. However, AFAIK, once this method is called, the observable completes and the sequence is terminated, so future calls to Receive
won't be listened to.
Can someone recommend a way to use the Rx.Net libraries to implement this observable pattern that I am looking for, that is, how to keep the sequence open and feed it with results for async methods?
Upvotes: 0
Views: 570
Reputation: 14370
Receive
as you've coded it, returns IObservable<Unit>
, representing the completion of a single task. You want to subscribe to something that returns IObservable<IObservable<Unit>>
representing a stream of task-completions.
There are a number of ways to do this, the best of which probably depends on how your class is set up and how you're calling it.
Here's the laziest one:
You declare a class-level variable subject
that represents a stream of your calls:
Subject<IObservable<Unit>> subject = new Subject<IObservable<Unit>>();
subject.Merge().Subscribe(
x => Console.WriteLine("On next"),
ex => { }, //TODO
() => { } // Completed
);
Then when you have a new call, you just add it to the subject.
IObservable<Unit> receiverObservable = batchHandler.Receive(message);
subject.OnNext(receiverObservable);
The reason this is really lazy is that Rx is functional at its core, which tends to look down on mutable-state variables. Subjects
are basically mutable state.
The better way to do it is to figure out when/why you're calling Receive
, and structure that as an observable. Once that's done, you can work off of that:
IObservable<Unit> sourceReasonsToCallReceive; // Most likely sourced from event
sourceReasonsToCallReceive.SelectMany(_ => batchHandler.Receive(message))
.SubScribe(
x => Console.WriteLine("On next"),
ex => { }, //TODO
() => { } // Completed
);
Hope that helps.
Upvotes: 1