Reputation: 5724
I'm implementing an inter-process messaging system, where e.g. the client can request certain data from the server. The server needs to be able to send back its reply in the form of partial repsonses, and also needs to be able to nofity the client in case any exception occured.
Currently, I'm doing this via 3 message types:
class PartialResponse : ResponseMessage { ... }
class ResponseError : ResponseMessage { ... }
class ResponseComplete : ResponseMessage { ... ]
so e.g. client requests data, server sends back 0-N PartialResponse messages followed by either a ResponseError or ResponseComplete.
The library I'm using (Obvs with NetMQ as its transport layer) would expose the stream of all possible messages as an
IObservable<ResponseMessage>
Though this observable stream would never complete, and I believe never fault either (unless maybe for some Obvs/NetMQ internal exceptions or such).
I want to transform this into an IObservable<PartialResponse>, which completes when the original stream pushed a ResponseComplete message, and faults when it either encounters a ResponseError message, or an actual error in the input stream. E.g. something like:
IObservable<PartialResponse> transform(IObservable<ResponseMessage> input)
{
var subject = new Subject<PartialResponse>();
input.Subscribe(
x =>
{
if(x is PartialResponse r)
subject.OnNext(r);
else if(x is ResponseComplete)
subject.OnCompleted();
else if(x is ResponseError err)
subject.OnError(new Exception(err?.ToString()));
else
throw new InvalidOperationException();
},
ex =>
{
subject.OnError(ex);
}
);
return subject;
}
this code should actually work, but is likely pretty bad - not least because it directly subscribes to the input observable sequence.
Is there any better/cleaner way to transform the observable sequence?
Upvotes: 3
Views: 190
Reputation: 14350
Here's @Enigmativity's answer fleshed out:
var input = new Subject<ResponseMessage>();
var partialResponseObservable = input
.Select(msg =>
(msg is PartialResponse r)
? Notification.CreateOnNext(r)
: (msg is ResponseComplete)
? Notification.CreateOnCompleted<PartialResponse>()
: (msg is ResponseError err)
? Notification.CreateOnError<PartialResponse>(new Exception(err?.ToString()))
: throw new InvalidOperationException()
)
.Dematerialize();
or with type matching (probably reads better):
var partialResponseObservable = input
.Select(msg =>
{
switch(msg)
{
case PartialResponse r:
return Notification.CreateOnNext(r);
case ResponseComplete rc:
return Notification.CreateOnCompleted<PartialResponse>();
case ResponseError err:
return Notification.CreateOnError<PartialResponse>(new Exception(err?.ToString()));
default:
throw new InvalidOperationException();
}
})
.Dematerialize();
Upvotes: 3