Reputation: 778
I'm starting with reactive extensions and I'm having a problem where I'm not sure if I'm on the right track.
I'm using an Observable to create and consume a listener for an event broker with .NET. I created a "IncomingMessage" class which contains the messages from the eventbroker as they come in and I start creating the listener in the Observerable.Create function. That works very well.
Now I also want to get status notification from the listener as in "Connecting...", "Connected.", "Closing..." which are not an IncomingMessage so I created a class "BrokerEvent" with a "Message" property and an interface for "IncomingMessage" and "BrokerEvent". Now I send both via observer.OnNext(...) as they occur. That also works well so far.
However on the Subscriber side I'm now having a bit of a problem to filter the events I need.
I do:
GetObservable().Where(x => x is BrokerEvent ||
(x is IncomingMessage msg &&
msg.User == "test")).Subscribe(...)
That works however I then need to figure out the type in Subscribe again which seems a bit ugly.
After trying a bit I ended up doing this now...
var observable = GetObservable().Publish();
observable.OfType<BrokerEvent>().Subscribe(...);
observable.OfType<IncomingMessage>().Where(x=>x.User == "test").Subscribe(...);
var disposable = observable.Connect();
This also seems to work but as I'm new to reactive extensions I'm not quite sure if that has any unwanted sideeffects. I'm also not sure if it's the "right" way to include status messages into the stream at all. Is there any better way to handle that (possible without using Publish) or is that the way to go?
And to stop listening is it enough to just dispose the disposable I get from .Connect() or do I have to also dispose both disposables I get from .Subscribe()?
Thanks!
Upvotes: 2
Views: 502
Reputation: 3697
What you can do is create an 'event handler' class with three overloads of a 'process message'. One for object (default) that does nothing, one for the status type, one for incoming message. In .Subscribe use this syntax
m=>processor.Process((dynamic)m)
This will call the correct implementation or do nothing, as required.
If you want to filter before calling the Process, you can introduce a common class (ProcessableMesage or some such), or you can call .Merge on your OfType streams, or you can take the same approach as above by having a dynamic MessageFilter class.
Upvotes: 0
Reputation: 14350
I'm assuming GetObservable
returns IObservable<object>
, which isn't ideal.
The best way to do the code you have above is as follows:
var observable = GetObservable().Publish();
var sub1 = observable.OfType<BrokerEvent>().Subscribe(_ => { });
var sub2 = observable.OfType<IncomingMessage>().Where(x => x.User == "test").Subscribe(_ => { });
var connectSub = observable.Connect();
var disposable = new CompositeDisposable(sub1, sub2, connectSub);
The composite disposable will then dispose of all the children when it is disposed.
If the two message streams have nothing to do with each other, that approach will work. However, since you basically have a control-message stream, and a data-message stream, I'm assuming the messages from one may be important when handling the messages in the other. In this case you may want to treat it like one stream.
In that case, you may want to create a Discriminated-Union type for your observable, which may make handling easier.
Upvotes: 2