Reputation: 1107
I have a message processing application that currently operates on small messages that fit easily into memory. I am extending it to operate on messages larger than memory (in the 10s to 100s of gigabytes) which will require some kind of streaming approach. I like what I've seen so far with how Reactive Extensions (especially this piece on "Rx on the server") model an asynchronous push event stream and how Rx can interoperate with other constructs and patterns in C# such as System.IO.Stream
(see previous link), TPL Tasks, the APM pattern, and the EAP pattern.
E.g., something such as IObservable<MyMessage>
is a plain vanilla example of an Rx stream. Because my app's implementation of MyMessage
represents data too large to fit into memory, I need something along the lines of IObservable<MyMessageChunk>
in which a variable number of MyMessageChunk
instances combine to represent a single MyMessage
. I need something like this paint-tastic marble diagram:
In the above diagram, each circle represents a chunk of a message and the colors delineate breaks between messages. The red X represents an error in the processing of the green message but I would need processing to continue with the following purple message even though the semantics of OnError
require the termination of the stream. The green bar at the end represents OnCompleted
which in this case would essentially mean the closing of the application rather than the successful completion of any one message.
Is there a way to model this kind of processing with Rx?
Upvotes: 2
Views: 1857
Reputation: 3464
If you have a function IObservable<T> ReadChunck(Chunk chunk)
then you can combine them using Observable.OnErrorResumeNext()
. This will concatenate the chunks when they call OnError
or OnCompleted
.
IObserable<T> finalStream = Observable.OnErrorResumeNext(
ReadChunk(chunk1),
ReadChunk(chunk2),
ReadChunk(chunck3);
Alternatively, you can combine the ReadChunk
s into an IEnumerable<IObservable<T>>
which will also work.
Upvotes: 1
Reputation: 2662
Using the T
in IObservable<T>
to encode your data seems like the correct approach, based on the short problem description that you've provided. (The Power of T)
There can be only one call to OnError
and then the observable is terminated, per the Rx Grammar, so that's not going to be a problem:
OnNext* (OnCompleted | OnError)?
See the Rx Design Guidelines for details.
However, you can use operators like Retry
and Catch
to continue the sequence with a different observable, or the original observable recursively, when an error occurs.
Upvotes: 1