Ben Smith
Ben Smith

Reputation: 2419

How to queue IObservable subjects, with cancellation support, for slow consumers using Reactive Extensions (Rx)

First some background, I've written an open source .NET library, named Duplicitiy (on github.com), that uses the FileSystemWatcher to duplicate all file changes between two directories.

I have written a FileSystemObservable class that implements IObservable<FileSystemChange> (which uses FSWatcher to wrap the actual FileSystemWatcher). When files or directories are created, modified or deleted the changes are published via a Subject<FileSystemChange> using Reactive Extensions.

I then subscribe to this observable using the following subscription.

 return observable
          .Buffer(() => observable.Throttle(TimeSpan.FromSeconds(2)).Timeout(TimeSpan.FromMinutes(1)))     
          .PrioritizeFileSystemChanges()           
          .SelectMany(x => x);

Changes are buffered until there is at least a 2 second period without any change for a maximum of 1 minute. This is due to the fact that when deleting a directory, the FileSystemWatcher notifies for all contained files and directories. We can optimise the behaviour by swallowing the changes contained within the directory and simply delete the parent in our subscriber. This is handled by the PrioritizeFileSystemChanges filter. It also allows us to ignore files that are created and subsequently deleted within the buffer window, again reducing IO operations on the target.

This works, albeit in a naive manner at the moment with no support for failure/retries.

However my question is that the subscriber to this observable could likely take a reasonable amount of time to process each change. For example, copying a large file to a slow file system. When a new file system change occurs for the same file that is currently being copied how can I handle aborting the in progress operation. Or, if the file is included in the buffered list but is outstanding, how can it be removed or excluded?

I assume that there would need to be another subscription to the original observable, but am unsure how best to share state, or modify pending tasks? The changes must be processed in the order they were received, that indicates a queue. However, a new file system change may apply to a queued operation that needs to be cancelled or removed. Queues aren't designed for out-of-order removal.

For example if we're currently copying file Foo\Bar.txt and the Foo directory is deleted. Then any in progress or pending changes for the directory, and all subdirectories, must be cancelled. Could this be a use case for the Task Parallel Library or is there some Reactive approach that I could take?

Any github pull requests would be kindly received too!

Upvotes: 0

Views: 771

Answers (1)

Gideon Engelberth
Gideon Engelberth

Reputation: 6155

You seem to have several goals/questions here:

  1. Remove earlier changes that are no longer needed because of later changes. A linked list could be a good fit for this. It provides good performance for both general queue usage and good item removal performance.
  2. Cancellation of in-progress operations that are no longer needed because of later changes. This would also include operations that need to be restarted. This will require you to find a library that allows you to cancel in-progress operations. The System.IO classes do not provide such cancellation, so you will need to either find a library or roll your own.
  3. Could this be a use case for the Task Parallel Library or is there some Reactive approach that I could take? Your phrasing struck me as if there is a one or the other choice here, but there's no reason you can't mix the two together. The observable you should for the file changes is a good starting point (RX). The in-progress operations would likely be implemented as methods taking a CancellationToken and returning a Task (TPL).

The missing step here seems to be how to go from a "queue" of changes to actual work. Basically, the subscription must queue the change (quickly) and launch a (slow, async) method, if it is not already running, that "recursively" processes the queue; something like:

'changes is your returned observable
'toProcess is the "queue" of changes
'processor holds information about and the task of the in-progress operation
changes.Subscribe(Sub(c)
                     UpdateQueueWithChange(c, toProcess, processor)
                     If processor.Task.IsCompleted Then
                         ProcessNextChange(processor, toProcess)
                     End If
                  End Sub)

ProcessNextChange is a method that will get the next change in the queue, start the operation, set the callback for the operation task to re-call ProcessNextChange. If no changes are left, processor should be given a completed task that does not re-call ProcessNextChange.

UpdateQueueWithChange will need to update the "queue" and cancel the in-progress operation if necessary, which should trigger a call to ProcessNextChange due to task completion that will start the next operation.

If you want to cancel the operations when you cancel the subscription to the changes observable, I would recommend putting the subscription disposable into a CompositeDisposable along with a SerialDisposable that would store a CancellationDispoable (updated by ProcessNextChange and stored additionally in processor) that is the source of the CancellationToken needed by the operation methods. ProcessNextChange would check the SerialDisposable to see if it had been disposed before launching an operation. The CompositeDisposable would be what you store somewhere to end the whole thing.

CompositeDisposable 'this is what your application keeps around
|- IDisposable from subscription to changes observable
|- SerialDisposable
   |- .Disposable property = CancellationDisposable 
      'changed each time ProcessNextChange is called

Upvotes: 1

Related Questions