Umair
Umair

Reputation: 3243

IObservable collect all elements in current stream and perform bulk operation

Example code

async Task Main()
{
    var entities = new double[] { 1, 2, 4, 8, 16, 32 }.ToObservable();

    entities = Square(entities);
    entities = Print(entities);
    entities = Add(entities, 1);
    entities = await SaveObservable(entities);
    entities = Add(entities, 2);

    await PrintAll(entities);
}

public IObservable<double> Square(IObservable<int> source)
    => source.Select(i => Math.Pow(i, 2.0));

public IObservable<double> Add(IObservable<double> source, double add)
    => source.Select(i => i + add);

public IObservable<T> Print<T>(IObservable<T> source)
    => source.Do(i => Console.WriteLine("Print: {0}", i));

public async Task<IObservable<T>> SaveObservable<T>(IObservable<T> source)
{
    var last = await source.LastOrDefaultAsync();
    Console.WriteLine("Collected observable and saving to the db. Last element is '{0}'", last);

    // await database.SaveChangesAsync();

    return source;
}

public async Task PrintAll(IObservable<double> source)
    => await source.ForEachAsync(i => Console.WriteLine("PrintAll: {0}", i));

Imagine that in my SaveObservable function, I want to perform an operation on the database, e.g. Save all the entities added to a database context in bulk so I can get the database engine (e.g. SQL) to populate any database generated ids.

So what I want this function to do is collect the passed in observable stream, call database.SaveChangesAsync and then essentially carry on by returning the passed in stream.

However if I run the code above, The Print function is called again. I do not want any projections/mappings of the observable called previously to run again.

How would I go about achieving this? Is it even possible?

Upvotes: 2

Views: 596

Answers (2)

Enigmativity
Enigmativity

Reputation: 117027

The reason you have two subscriptions is due to these two lines:

  1. var last = await source.LastOrDefaultAsync();
  2. await source.ForEachAsync(i => Console.WriteLine("PrintAll: {0}", i));

Both cause a subscription to your original source observable.

The first is effectively:

var last = await
    new double[] { 1, 2, 4, 8, 16, 32 }
        .ToObservable()
        .Select(i => Math.Pow(i, 2.0))
        .Do(i => Console.WriteLine("Print: {0}", i))
        .Select(i => i + 1)
        .LastOrDefaultAsync();

The second is effectively:

await
    new double[] { 1, 2, 4, 8, 16, 32 }
        .ToObservable()
        .Select(i => Math.Pow(i, 2.0))
        .Do(i => Console.WriteLine("Print: {0}", i))
        .Select(i => i + 1)
        .Select(i => i + 2)
        .ForEachAsync(i => Console.WriteLine("PrintAll: {0}", i));

The answer to this problem is to avoid any of the async/await code. Rx handles all the sync work far better than TPL does anyway.

You can also await an observable without involving tasks.

Perhaps try your code this way:

async void Main()
{
    var entities = new double[] { 1, 2, 4, 8, 16, 32 }.ToObservable();

    entities = Square(entities);
    entities = Print(entities);
    entities = Add(entities, 1);
    entities = SaveObservable(entities);
    entities = Add(entities, 2);

    await PrintAll(entities);
}

public IObservable<double> Square(IObservable<double> source)
    => source.Select(i => Math.Pow(i, 2.0));

public IObservable<double> Add(IObservable<double> source, double add)
    => source.Select(i => i + add);

public IObservable<T> Print<T>(IObservable<T> source)
    => source.Do(i => Console.WriteLine("Print: {0}", i));

public IObservable<T> SaveObservable<T>(IObservable<T> source)
{
    return Observable.Create<T>(o =>
    {
        var last = default(T);
        return 
            source
                .Do(
                    t => last = t,
                    () =>
                    {
                        Console.WriteLine("Collected observable and saving to the db. Last element is '{0}'", last);
                    })
                .Subscribe(o);
    });
}

public IObservable<double> PrintAll(IObservable<double> source)
    => source.Do(i => Console.WriteLine("PrintAll: {0}", i));

This works and produces the following output:

Print: 1
PrintAll: 4
Print: 4
PrintAll: 7
Print: 16
PrintAll: 19
Print: 64
PrintAll: 67
Print: 256
PrintAll: 259
Print: 1024
PrintAll: 1027
Collected observable and saving to the db. Last element is '1025'

To allow the SaveObservable to complete before continuing with the pipeline you need to add a .ToArray() to that method. That changes the IObservable<T> that produces zero or more values to a IObservable<T[]> that produces one array with zero or more elements. Thus it must complete all of the values before it moves on.

Try changing SaveObservable to:

public IObservable<T> SaveObservable<T>(IObservable<T> source)
{
    return
        source
            .ToArray()
            .Do(ts =>
            {
                var last = ts.Last();
                Console.WriteLine("Collected observable and saving to the db. Last element is '{0}'", last);
            })
            .SelectMany(t => t);
}

That now outputs:

Print: 1
Print: 4
Print: 16
Print: 64
Print: 256
Print: 1024
Collected observable and saving to the db. Last element is '1025'
PrintAll: 4
PrintAll: 7
PrintAll: 19
PrintAll: 67
PrintAll: 259
PrintAll: 1027

The code is now effectively this:

await
    new double[] { 1, 2, 4, 8, 16, 32 }
        .ToObservable()
        .Select(i => Math.Pow(i, 2.0))
        .Do(i => Console.WriteLine("Print: {0}", i))
        .Select(i => i + 1)
        .ToArray()
        .Do(ts =>
        {
            var last = ts.Last();
            Console.WriteLine("Collected observable and saving to the db. Last element is '{0}'", last);
        })
        .SelectMany(t => t)
        .Select(i => i + 2)
        .Do(i => Console.WriteLine("PrintAll: {0}", i));

Upvotes: 3

Shlomo
Shlomo

Reputation: 14350

First, your comment is spot on: this is probably best done with plain-old, pull-based programming.

As for your code you have two problems going on:

1) Observables are like item pipelines. Your code essentially sets up two pipelines:

entities -> Square -> Print -> Add(1) -> SaveObservable
entities -> Square -> Print -> Add(1) -> Add(2) -> PrintAll

This is why you see Print called twice. If you want to see it called once, you probably want the pipelines to do something like this:

entities -> Square -> Print -> Add(1) -> SaveObservable
                                     \-> Add(2) -> PrintAll

2) However, that's easier said then done, thanks to the mixture of async and IObservable. The lazy way to make a multi-casted pipeline is to slap a .Publish().RefCount() after the Add call:

entities = Square(entities);
entities = Print(entities);
var entitiesAdded = Add(entities, 1).Publish().RefCount();
var _ = await SaveObservable(entitiesAdded);
entities = Add(entitiesAdded, 2);

await PrintAll(entities);

This doesn't work though, because Publish makes the observable is hot, and once it is used in SaveObservable, the items don't emit again. Another way around is .Publish() followed by a .Connect() when all the children are connected, but that doesn't play well with await.

The way that will work for your code is to add .Replay().RefCount(), but it may not perform well, because Rx has to set up an internal buffer that holds all items for the life of the subscription.

var entities = new double[] { 1, 2, 4, 8, 16, 32 }.ToObservable();

entities = Square(entities);
entities = Print(entities);
entities = Add(entities, 1).Replay().RefCount();
entities = await SaveObservable(entities);
entities = Add(entities, 2);

await PrintAll(entities);

For more information about Hot/Cold observables, Publish, Connect, Replay, and RefCount, see here.

In summary though: I suggest you use EF and TPL without Rx.

Upvotes: 3

Related Questions