Buxel
Buxel

Reputation: 104

Rx "pipeline" with decoupled source and destination

I'm currently trying to use RX to organize a relatively simple data flow. I chose RX because it seemed like a natrual fit (correct me, if iam wrong) and i hope it makes extending my "pipeline" easier later on.

RX is still quite new to me, I understand the basic concepts and have been eying it for quite some time but now it is the first time evaluating it for some real work.

My general flow is: DataSource -> Pipeline -> DataDestination

DataSource returns an IObservable with data collected from different sources

public class DataSource
{
    IObservable<Data> Run();
}

DataDestination is responsible for sending the data to another server.

public interface ILogDestination
{
    void SendData(IList<Data> dataList);
}

Pipeline subscribes to the DataSource and is responsible for buffering, batching and passing on the data to the DataDestination. It will also handle reties, if a DataDestination fails for some reason (server not available, timeouts)

public class Pipeline : IPipeline
{
    private ILogSource _source;
    private ILogDestination _destination;

    public void Start()
    {
        _destination.Initialize();
        _source.Initialize();

        _source.Run()
            .Buffer(TimeSpan.FromSeconds(1), 100) // Create batches
            .Do(_destination.SendData) // Send the data               
            .Retry(5) // Retry in case of timeouts
            .Subscribe();

            //Todo: think about error handling / retry mechanism
            //Todo: batching/buffering of data for rate limiting max (i.e. 100 per minute)
    }
}

My problem is that in case the 5 retries fail the data generated by the source should not be lost in any case.

Catching the exceptions requires me to supply replacement data (?), which makes no sense to me in this situation. Swallowing the exception will drop my data - I'd like to avoid that, too.

I thought about implementing a "database" layer inside the pipeline, that queues all messages so the source and destination are decoupled but this feels like a quite non-reactive way.

The small examples on http://www.introtorx.com/ helped a lot already but I'm missing the big picture here.

Is Rx the right tool for the job or am I forcing things here?

Thank you for your time.

Upvotes: 1

Views: 265

Answers (2)

Lee Campbell
Lee Campbell

Reputation: 10783

Rx is great for acting as the conduit for pipelines. Rx can be effective for batching and executing retires such as your query is doing.

However I don't think Rx is so strong if you need to process data. i.e. if when an event payload is received you need to either execute some compute intensive or some I/O. Doing either of these can take a non-insignificant* time. This means you are highly likely to buffering** other values as the arrive and wait to be processed, or worse, you are blocking the producer.

Why is this a problem?

In the case of the buffering (implicit queue), you have data the producer believes that they have published, but has yet to be processed. If the subscription is disposed or an error occurs in the pipeline, then your data is lost.

In the case of the producer being blocked, well you are breaking the Rx paradigm and you probably should be looking using IEnumerable<T>/Pull. In this case explicit queues can be a great solution.

Back to your question.

First we don't know if you data source is a Hot or Cold sequence. i.e. if we stop listening, will we drop messages? or If we keep resubscribing (e.g. with Retry()) will it keep repeating data on us?

Hot data?

If your data source is hot, then I think you have to persist data in your pipeline if you want the highest chance of your Destination getting all the data. In this case I think it would be best to just subscribe and dump all that you can into a queue. This however is a pain point because the Queue should be backed by disk so therefore involves I/O. From what I understand most logging frameworks implement a non-blocking write which just buffers the entry. A dedicated thread drains the buffer to disk. Yes some entries can be dropped, but it is either blocking at write-time or lossy.

In this process we try to optimise each part to be as fast as possible. i.e. just dump the message to disk as fast as possible avoiding overhead of frameworks, libraries, serializations where possible. Then in another process/thread we take these values and perform our slow process of sending it to another server.

Cold data?

If your data source is cold, then I think you will need a way to specify a point in the sequence that you want to resume from. e.g. a timestamp, a checkpoint, version or sequence number. Then your destination will need to be able to expose its current checkpoint. The source would need to be able to allow you to subscribe from that checkpoint. Things like https://geteventstore.com/ and https://github.com/damianh/SqlStreamStore support this.

In this scenario, the cost of sending to the destination server which could cause backing up hopefully is mitigated by the event stream technology you are leaning on. If that is providing a dedicated stream for your query, then you may not be "blocking" the producer.

Observable.Defer(()=> _source.GetFrom(_destination.CurrentCheckpoint()))
    //Create batches
    .Buffer(TimeSpan.FromSeconds(1), 100) 
    //Custom Operator for you to write of find on the internet
    .BackoffRetry(
        initialDelay:TimeSpan.FromSeconds(1), 
        backoff: delay=>delay*2, 
        maxRetries :5)
    //Send the data. 
    // This should be considered a cheap operation. i.e. no I/O or computations.
    .Subscribe(
      _destination.SendData,
      ex=>/*We know this can error, so need to cater for the 6th failure*/);

*Yes I know it is a double negative. My point is that say 50ms might not seem "significant", however when you might be expecting 100msg/sec then it cant be considered insignificant.

**Buffering here is what I call "Implicit queueing". While queues are everywhere, I would favour explicit ones over implicit ones.

Upvotes: 4

Mark van Straten
Mark van Straten

Reputation: 9425

You should not use the .Do() operator for your main flow, it is intended for side effects (like debug logging).

Rx is a fit for what you want to archieve because you want to functionally compose a stream of future values. Try something like this:

  // make your SendData return a Observable which can succeed (emit Unit value[=void] or something more usefull) or throw exception if sending data fails
  function IOBservable<Unit> SendData(IList<string> logs){}

  _source.Run()
   .Buffer(TimeSpan.FromSeconds(1), 100) // Create batches
   .SelectMany(bufferedLogs => SendData(bufferedLogs)
     .Retry(5)
     .Catch(/* choose what to do with original logs after 5 retries*/)
    )
   .Subscribe();

This leaves a few decisions to make:

  • What to do after 5 retries, store them on disk?
  • Retry() will immediatly retry after the original SendData() has failed. Maybe you want to implement a backoff mechanism?

Upvotes: 1

Related Questions