Dan Hermann
Dan Hermann

Reputation: 1107

Reactive Extensions for processing continuous streams of messages

I have a message processing application that currently operates on small messages that fit easily into memory. I am extending it to operate on messages larger than memory (in the 10s to 100s of gigabytes) which will require some kind of streaming approach. I like what I've seen so far with how Reactive Extensions (especially this piece on "Rx on the server") model an asynchronous push event stream and how Rx can interoperate with other constructs and patterns in C# such as System.IO.Stream (see previous link), TPL Tasks, the APM pattern, and the EAP pattern.

E.g., something such as IObservable<MyMessage> is a plain vanilla example of an Rx stream. Because my app's implementation of MyMessage represents data too large to fit into memory, I need something along the lines of IObservable<MyMessageChunk> in which a variable number of MyMessageChunk instances combine to represent a single MyMessage. I need something like this paint-tastic marble diagram:

Rx "marble" diagram

In the above diagram, each circle represents a chunk of a message and the colors delineate breaks between messages. The red X represents an error in the processing of the green message but I would need processing to continue with the following purple message even though the semantics of OnError require the termination of the stream. The green bar at the end represents OnCompleted which in this case would essentially mean the closing of the application rather than the successful completion of any one message.

Is there a way to model this kind of processing with Rx?

Upvotes: 2

Views: 1857

Answers (2)

Matthew Finlay
Matthew Finlay

Reputation: 3464

If you have a function IObservable<T> ReadChunck(Chunk chunk) then you can combine them using Observable.OnErrorResumeNext(). This will concatenate the chunks when they call OnError or OnCompleted.

IObserable<T> finalStream = Observable.OnErrorResumeNext(
    ReadChunk(chunk1),
    ReadChunk(chunk2),
    ReadChunk(chunck3);

Alternatively, you can combine the ReadChunks into an IEnumerable<IObservable<T>> which will also work.

Upvotes: 1

Dave Sexton
Dave Sexton

Reputation: 2662

Using the T in IObservable<T> to encode your data seems like the correct approach, based on the short problem description that you've provided. (The Power of T)

There can be only one call to OnError and then the observable is terminated, per the Rx Grammar, so that's not going to be a problem:

OnNext* (OnCompleted | OnError)?

See the Rx Design Guidelines for details.

However, you can use operators like Retry and Catch to continue the sequence with a different observable, or the original observable recursively, when an error occurs.

Upvotes: 1

Related Questions