GrahamB
GrahamB

Reputation: 1428

.NET Observable wrapped in a class to expose another Observable on the interface

I've seen a few things like this in RxJava and Swift but not .NET. I'm not even sure it's a recommended pattern...

I have a custom ObservableFileSystemWatcher class that monitors a path for changes. When it detects, it samples to get the last event and then maps the contents (reads) to the individual records in that file. This class is enscapulated within a Monitor class. In this I want to expose another Observable that simply returns an IObservable which is the contents of the file.

Something like this to get the records...

            _observableFileSystemWatcher.Changed
                .Sample(TimeSpan.FromSeconds(1))
                .Select(async f =>
                {
                    //Read file and get all records into memory before passing these onto next part of the pipeline
                    Debug.WriteLine($"Filename {f.Name} found");
                    var file = await File.ReadAllLinesAsync(f.FullPath);
                    return file;
                })
                .SelectMany(r => r)
                .Subscribe(r =>
                {
                    Debug.WriteLine(r);
                });

The Monitor class has an interface of:

    public interface IMonitor
    {
        IObservable<string> Content { get; }
    }

So what I think I want to do is to subscribe to the observable file watcher and then pipe this out into the IMonitor Content Observable for the IMonitor consumers to subscribe to. They will just get a continued stream of records without needing to worry where the monitor got them from.

There are plenty of Observable.Fromxxxx helpers but nothing I can see that enables me to do this. Is there meant to be a better way of wrapping Observables within each other?

Upvotes: 0

Views: 753

Answers (2)

Kevin Krumwiede
Kevin Krumwiede

Reputation: 10298

I'm not super experienced with System.Reactive, but your question seems like you're thinking the way I did in a slightly earlier stage of my understanding.

I recommend using Observable.Create to create an IObservable<string> that lazily creates your ObservableFileSystemWatcher when an observer subscribes. This is a common pattern in RX, and I recommend it especially in this case because FileSystemWatcher is deceptive in its simplicity. If it ever emits an error, it will never again emit a change event. For example, if it's watching a network share and that share becomes unavailable, that instance of FileSystemWatcher will become unusable, even when the share becomes available again. You need to create a new instance of FileSystemWatcher. Trying to replace the instance inside your observable wrapper gets messy fast. Just throw out the whole observable and recreate it by resubscribing. This is what Retry is for. If multiple observers need to observe the same change events, then subscribe a Subject to your observable and subscribe your observers to the Subject. You will have then created a "hot" observable; it will be "running" even when nothing is subscribed to the Subject.

I happen to have written my own RxFileSystemWatcher and blogged about it. It demonstrates the pattern of lazily creating and configuring the thing being observed when an observer subscribes to the observable returned by Observable.Create.

public static class RxFileSystemWatcher
{
  public static IObservable<FileSystemEventArgs> Create(Action<FileSystemWatcher> config)
  {
    return Observable.Create<FileSystemEventArgs>((observer) =>
    {
      void onNext(object sender, FileSystemEventArgs args) => observer.OnNext(args);
      var watcher = new FileSystemWatcher();
      try
      {
        config(watcher);
        watcher.Changed += onNext;
        watcher.Created += onNext;
        watcher.Deleted += onNext;
        watcher.Renamed += (sender, args) =>
        {
          var dir = Path.GetDirectoryName(args.FullPath);
          onNext(sender, new FileSystemEventArgs(WatcherChangeTypes.Deleted, dir, args.OldName));
          onNext(sender, new FileSystemEventArgs(WatcherChangeTypes.Created, dir, args.Name));
        };
        watcher.Error += (sender, args) => observer.OnError(args.GetException());
        watcher.EnableRaisingEvents = true;
      }
      catch (Exception e)
      {
        observer.OnError(e);
      }
      return watcher;
    });
  }
}

Upvotes: 0

Theodor Zoulias
Theodor Zoulias

Reputation: 43525

Probably this is what you want:

class Monitor
{
    public IObservable<string> Content { get; private set; }

    public Monitor()
    {
        this.Content = _observableFileSystemWatcher.Changed
            .Sample(TimeSpan.FromSeconds(1))
            .SelectMany(async e => await File.ReadAllText(e.FullPath))
            .Publish().RefCount();
    }
}

As long as the Content sequence has subscribers, the text of the changed files will be read and propagated to all subscribers.

Upvotes: 1

Related Questions