arena-ru
arena-ru

Reputation: 1020

How to organize sequence of data processors with .net RX

What is the best way to organize sequence of data processors with .net RX?
- a. Call methods on observable like observable.Do(log).Select(transformation).Do(work).Aggregate(someState)...
- b. Implement custom observers, if so - how to chain them
- c. Other option.. And also what is the best option to handle possible exceptions in observable itself (see my concerns above) and to handle exceptions inside Do, Select, etc (as I know the best practice is that Subscribers shouldn't throw).

Also I need sometimes to allow exceptions being returned as some elements of observable sequence without sequence being stopped (see this question Handling Exceptions in Reactive Extensions without stopping sequence)

Upvotes: 0

Views: 585

Answers (1)

Lee Campbell
Lee Campbell

Reputation: 10783

This seems like you need a workflow instead of Rx. It seems based on your other questions* you are trying to take what looks very well suited to a ProducerConsumer queuing workflow scenario, and forcing it into Rx.

It looks like you

  • want to Read from a Queue and block until a value is received, then resubscribe. Just use Queuing features of BlockingCollection. As values arrive they can be pushed into the collection from any thread.
  • dont want to actually have a sequence of values, but a workflow that can fail on any given value, divert that and then process the next value. Just use a queue. Process each value and put the results into the next appropriate queue (Failure/Success) ->See Enterprise Integration Patterns, specifically Invalid Message Channel and Dead Letter channel.
  • potentially process these values in parallel. See BlockingCollection.GetConsumingEnumerable or the Disruptor for a high performance implementation. With these tools you can have many produces and consumers. Sure you can do this with Rx, but it is just polling and tying up a thread doing so. I think it is better to be explicit about this kind of thing

I think your usage of Rx should pay for itself and you shouldn't find yourself fighting it (like any other technology or framework).

*other questions like Handling Exceptions in Reactive Extensions without stopping sequence and How to serialize Observables to the cloud and back

Upvotes: 2

Related Questions