Reputation: 1680
Currently, I'm using the RX Framework to implement a workflow-like message handling pipeline. Essentially I have a message producer (deserializes network messages and calls OnNext() on a Subject) and I have several consumers.
NOTE: If and transform are extension methods I have coded that simply return an IObservable.
A consumer does something like the following:
var commerceRequest = messages.Transform(x => GetSomethingFromDatabase(x)
.Where(y => y.Value > 5)
.Select(y => y.ComplexObject)
.If(z => z.IsPaid, respond(z))
.Do(z => SendError(z));
commerceRequest
is then consumed by another similar pipeline and this continues up until the top where it ends with someone calling Subscribe()
on the final pipeline. The issue I'm having is that the messages from the base don't propagate up unless subscribe is called on messages directly somewhere.
How can I push the messages up to the top of the stack? I know this is an unorthodox approach but I feel it makes the code very simple to understand what is occurring to a message. Can anyone suggest another way of doing the same if you feel this is a totally terrible idea?
Upvotes: 4
Views: 1622
Reputation: 6155
Why should they go through the pipeline if there are no subscribers? If one of your intermediate steps is useful for their side-effects (You want them to run even if there are no other subscribers), you should rewrite the side-effect operation to be a subscriber.
You could also make the step with a side-effect as a pass-through operation (or tee, if you will) if you wanted to continue the chain.
Upvotes: 1