Reputation: 17498
I have written some code which turns the FileSystemWatcher
's Changed
event in to an observable sequence.
My goal is two split all file system changes in to separate streams and throttling them.
For example if I have 10 different files which change 3 times in half a second, I'll only get a notifcation once for each file.
What concerns me though is the GroupBy()
operator. For this to work, (I assume) it would need to keep building up the group over time and consuming small amounts of memory.
Will this cause a "leak" and if so, how can I prevent it?
FileSystemWatcher _watcher = new FileSystemWatcher("d:\\") {
EnableRaisingEvents = true,
NotifyFilter = NotifyFilters.LastWrite | NotifyFilters.Size
};
void Main()
{
var fileSystemEventStream =
Observable.FromEventPattern<FileSystemEventHandler, FileSystemEventArgs>
(
_ => _watcher.Changed += _,
_ => _watcher.Changed -= _
)
.ObserveOn(ThreadPoolScheduler.Instance)
.SubscribeOn(ThreadPoolScheduler.Instance)
.GroupBy(ep => ep.EventArgs.FullPath, ep => ep.EventArgs.FullPath)
;
var res =
from fileGroup in fileSystemEventStream
from file in fileGroup.Throttle(TimeSpan.FromSeconds(1))
select file;
res.Subscribe(
ReceiveFsFullPath,
exception => {
Console.WriteLine ("Something went wrong - " + exception.Message + " " + exception.StackTrace);
});
Console.Read();
}
void ReceiveFsFullPath(string s){
Console.WriteLine ("Received file system event on thread " + Thread.CurrentThread.ManagedThreadId);
Console.WriteLine(s);
}
Upvotes: 8
Views: 2364
Reputation: 10783
As per Brandon's reply, the subjects will grow and have no way of being reclaimed*. My main concern with leaking memory here is that you don't capture the subscription! i.e.
res.Subscribe(...
must be replaced with
subscription = res.Subscribe(...
if you don't capture the subscription, you can never dispose of the subscription, thus you never release the event handlers, thus you have "leaked memory". Obviously this is no use if you dont somewhere actually dispose of the subscription.
*Well, if they completed then they would be auto disposed, so that would work. You could look to complete a sequence when a FileDeleted event came through?
Upvotes: 1
Reputation: 39182
Yes, for each new key, GroupBy creates a Subject, and maintains a dictionary of these subjects. And you are subscribing to each of these. So that is a small chunk of memory that will grow over time without anyway to release the old entries. What you really need is for the key to be removed when the throttle timer expires. I cannot think of a way to do this with the builtin operators. So you need a custom operator. Here's a stab at one.
public IObservable<T> ThrottleDistinct<T>(this IObservable<T> source, TimeSpan delay)
{
return Observable.Create(observer =>
{
var notifications = new Subject<IObservable<T>>();
var subscription = notifications.Merge().Subscribe(observer);
var d = new Dictionary<T, IObserver<T>>();
var gate = new object();
var sourceSubscription = new SingleAssignmentDisposable();
var subscriptions = new CompositeDisposable(subscription, sourceSubscription);
sourceSubscription.Disposable = source.Subscribe(value =>
{
IObserver<T> entry;
lock(gate)
{
if (d.TryGetValue(value, out entry))
{
entry.OnNext(value);
}
else
{
var s = new Subject<T>();
var o = s.Throttle(delay).FirstAsync().Do(() =>
{
lock(gate)
{
d.Remove(value);
}
});
notifications.OnNext(o);
d.Add(value, s);
s.OnNext(value);
}
}
}, observer.OnError, notifications.OnCompleted);
return subscriptions;
});
}
...
Observable.FromEventPattern(...)
.Select(e => e.EventArgs.FullPath)
.ThrottleDistinct(TimeSpan.FromSeconds(1))
.Subscribe(...);
Upvotes: 4