Reputation: 2419
First some background, I've written an open source .NET library, named Duplicitiy (on github.com), that uses the FileSystemWatcher
to duplicate all file changes between two directories.
I have written a FileSystemObservable
class that implements IObservable<FileSystemChange>
(which uses FSWatcher to wrap the actual FileSystemWatcher
). When files or directories are created, modified or deleted the changes are published via a Subject<FileSystemChange>
using Reactive Extensions.
I then subscribe to this observable using the following subscription.
return observable
.Buffer(() => observable.Throttle(TimeSpan.FromSeconds(2)).Timeout(TimeSpan.FromMinutes(1)))
.PrioritizeFileSystemChanges()
.SelectMany(x => x);
Changes are buffered until there is at least a 2 second period without any change for a maximum of 1 minute. This is due to the fact that when deleting a directory, the FileSystemWatcher
notifies for all contained files and directories. We can optimise the behaviour by swallowing the changes contained within the directory and simply delete the parent in our subscriber. This is handled by the PrioritizeFileSystemChanges
filter. It also allows us to ignore files that are created and subsequently deleted within the buffer window, again reducing IO operations on the target.
This works, albeit in a naive manner at the moment with no support for failure/retries.
However my question is that the subscriber to this observable could likely take a reasonable amount of time to process each change. For example, copying a large file to a slow file system. When a new file system change occurs for the same file that is currently being copied how can I handle aborting the in progress operation. Or, if the file is included in the buffered list but is outstanding, how can it be removed or excluded?
I assume that there would need to be another subscription to the original observable, but am unsure how best to share state, or modify pending tasks? The changes must be processed in the order they were received, that indicates a queue. However, a new file system change may apply to a queued operation that needs to be cancelled or removed. Queues aren't designed for out-of-order removal.
For example if we're currently copying file Foo\Bar.txt
and the Foo
directory is deleted. Then any in progress or pending changes for the directory, and all subdirectories, must be cancelled. Could this be a use case for the Task Parallel Library or is there some Reactive approach that I could take?
Any github pull requests would be kindly received too!
Upvotes: 0
Views: 771
Reputation: 6155
You seem to have several goals/questions here:
CancellationToken
and returning a Task
(TPL).The missing step here seems to be how to go from a "queue" of changes to actual work. Basically, the subscription must queue the change (quickly) and launch a (slow, async) method, if it is not already running, that "recursively" processes the queue; something like:
'changes is your returned observable
'toProcess is the "queue" of changes
'processor holds information about and the task of the in-progress operation
changes.Subscribe(Sub(c)
UpdateQueueWithChange(c, toProcess, processor)
If processor.Task.IsCompleted Then
ProcessNextChange(processor, toProcess)
End If
End Sub)
ProcessNextChange
is a method that will get the next change in the queue, start the operation, set the callback for the operation task to re-call ProcessNextChange. If no changes are left, processor
should be given a completed task that does not re-call ProcessNextChange.
UpdateQueueWithChange
will need to update the "queue" and cancel the in-progress operation if necessary, which should trigger a call to ProcessNextChange
due to task completion that will start the next operation.
If you want to cancel the operations when you cancel the subscription to the changes observable, I would recommend putting the subscription disposable into a CompositeDisposable
along with a SerialDisposable
that would store a CancellationDispoable
(updated by ProcessNextChange
and stored additionally in processor
) that is the source of the CancellationToken
needed by the operation methods.
ProcessNextChange would check the SerialDisposable to see if it had been disposed before launching an operation. The CompositeDisposable would be what you store somewhere to end the whole thing.
CompositeDisposable 'this is what your application keeps around
|- IDisposable from subscription to changes observable
|- SerialDisposable
|- .Disposable property = CancellationDisposable
'changed each time ProcessNextChange is called
Upvotes: 1