Reputation: 1428
I've seen a few things like this in RxJava and Swift but not .NET. I'm not even sure it's a recommended pattern...
I have a custom ObservableFileSystemWatcher class that monitors a path for changes. When it detects, it samples to get the last event and then maps the contents (reads) to the individual records in that file. This class is enscapulated within a Monitor class. In this I want to expose another Observable that simply returns an IObservable which is the contents of the file.
Something like this to get the records...
_observableFileSystemWatcher.Changed
.Sample(TimeSpan.FromSeconds(1))
.Select(async f =>
{
//Read file and get all records into memory before passing these onto next part of the pipeline
Debug.WriteLine($"Filename {f.Name} found");
var file = await File.ReadAllLinesAsync(f.FullPath);
return file;
})
.SelectMany(r => r)
.Subscribe(r =>
{
Debug.WriteLine(r);
});
The Monitor class has an interface of:
public interface IMonitor
{
IObservable<string> Content { get; }
}
So what I think I want to do is to subscribe to the observable file watcher and then pipe this out into the IMonitor Content Observable for the IMonitor consumers to subscribe to. They will just get a continued stream of records without needing to worry where the monitor got them from.
There are plenty of Observable.Fromxxxx helpers but nothing I can see that enables me to do this. Is there meant to be a better way of wrapping Observables within each other?
Upvotes: 0
Views: 753
Reputation: 10298
I'm not super experienced with System.Reactive
, but your question seems like you're thinking the way I did in a slightly earlier stage of my understanding.
I recommend using Observable.Create
to create an IObservable<string>
that lazily creates your ObservableFileSystemWatcher
when an observer subscribes. This is a common pattern in RX, and I recommend it especially in this case because FileSystemWatcher
is deceptive in its simplicity. If it ever emits an error, it will never again emit a change event. For example, if it's watching a network share and that share becomes unavailable, that instance of FileSystemWatcher
will become unusable, even when the share becomes available again. You need to create a new instance of FileSystemWatcher
. Trying to replace the instance inside your observable wrapper gets messy fast. Just throw out the whole observable and recreate it by resubscribing. This is what Retry
is for. If multiple observers need to observe the same change events, then subscribe a Subject
to your observable and subscribe your observers to the Subject
. You will have then created a "hot" observable; it will be "running" even when nothing is subscribed to the Subject
.
I happen to have written my own RxFileSystemWatcher
and blogged about it. It demonstrates the pattern of lazily creating and configuring the thing being observed when an observer subscribes to the observable returned by Observable.Create
.
public static class RxFileSystemWatcher
{
public static IObservable<FileSystemEventArgs> Create(Action<FileSystemWatcher> config)
{
return Observable.Create<FileSystemEventArgs>((observer) =>
{
void onNext(object sender, FileSystemEventArgs args) => observer.OnNext(args);
var watcher = new FileSystemWatcher();
try
{
config(watcher);
watcher.Changed += onNext;
watcher.Created += onNext;
watcher.Deleted += onNext;
watcher.Renamed += (sender, args) =>
{
var dir = Path.GetDirectoryName(args.FullPath);
onNext(sender, new FileSystemEventArgs(WatcherChangeTypes.Deleted, dir, args.OldName));
onNext(sender, new FileSystemEventArgs(WatcherChangeTypes.Created, dir, args.Name));
};
watcher.Error += (sender, args) => observer.OnError(args.GetException());
watcher.EnableRaisingEvents = true;
}
catch (Exception e)
{
observer.OnError(e);
}
return watcher;
});
}
}
Upvotes: 0
Reputation: 43525
Probably this is what you want:
class Monitor
{
public IObservable<string> Content { get; private set; }
public Monitor()
{
this.Content = _observableFileSystemWatcher.Changed
.Sample(TimeSpan.FromSeconds(1))
.SelectMany(async e => await File.ReadAllText(e.FullPath))
.Publish().RefCount();
}
}
As long as the Content
sequence has subscribers, the text of the changed files will be read and propagated to all subscribers.
Upvotes: 1