Kelly
Kelly

Reputation: 7193

Merging Observables in Reactive Extensions?

Just learning RX and wanted to make a program that iterated the file system. Here is what I came up with that works:

using System;
using System.IO;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace ConsoleApplication8
{
    internal class Program
    {
        private static IObservable<string> GetFiles(string folder, string filePattern)
        {
            return Observable.Create<string>(
                o =>
                {
                    var files = Directory.GetFiles(folder, filePattern);

                    foreach (var file in files)
                    {
                        o.OnNext(file);
                    }

                    var folders = Directory.GetDirectories(folder);

                    foreach (var f in folders)
                    {
                        var x = GetFiles(f, filePattern);
                        x.Subscribe(p => { o.OnNext(p); });
                    }

                    o.OnCompleted();

                    return Disposable.Empty;
                });
        }

        private static void Main(string[] args)
        {
            var o = GetFiles(@"d:\temp", "*.*");

            o.Subscribe(p => { Console.WriteLine(p); });

            Console.Read();
        }
    }
}

(Note the use of recursion by calling GetFiles again and subscribing)

While it works it seems very clumsy, I can't help thinking that I should be using something like Concat to combine the sequences instead of just bubbling them back up.

Also I would like to change that Foreach to a Parallel.ForEach but I'm unsure the ramifications this would have using RX. I can't seem to find much for documentation.

Any tips on how to write this better using RX?

Upvotes: 1

Views: 359

Answers (2)

Foole
Foole

Reputation: 4850

To solve a problem like this, it can help to write a LINQ version of the function first. eg:

    static IEnumerable<string> GetFiles(string folder, string filePattern)
    {
        return Directory.GetFiles(folder, filePattern)
            .Concat(Directory.GetDirectories(folder).SelectMany(f => GetFilesEnumerable(f, filePattern)));
    }

Then just change the IEnumerables to IObservables:

    static IObservable<string> GetFiles(string folder, string filePattern)
    {
        return Directory.GetFiles(folder, filePattern).ToObservable()
            .Concat(Directory.GetDirectories(folder).ToObservable().SelectMany(f => GetFilesEnumerable(f, filePattern)));
    }

Upvotes: 2

Kelly
Kelly

Reputation: 7193

Turns out, based on my discovery of SearchOption.AllDirectories (why in all my years have I never noticed this) it can be boiled down into two real lines of code:

using System;
using System.IO;
using System.Reactive.Linq;

namespace ConsoleApplication8
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            var o = Directory.GetFiles(@"e:\code", "*.*", SearchOption.AllDirectories).ToObservable();
            o.Subscribe(f => Console.WriteLine(f));


            Console.Read();
        }
    }
}

Crazy how simple it really is now. Need a new RX problem to play with.

Upvotes: 0

Related Questions