Reputation: 3243
Example code
async Task Main()
{
var entities = new double[] { 1, 2, 4, 8, 16, 32 }.ToObservable();
entities = Square(entities);
entities = Print(entities);
entities = Add(entities, 1);
entities = await SaveObservable(entities);
entities = Add(entities, 2);
await PrintAll(entities);
}
public IObservable<double> Square(IObservable<int> source)
=> source.Select(i => Math.Pow(i, 2.0));
public IObservable<double> Add(IObservable<double> source, double add)
=> source.Select(i => i + add);
public IObservable<T> Print<T>(IObservable<T> source)
=> source.Do(i => Console.WriteLine("Print: {0}", i));
public async Task<IObservable<T>> SaveObservable<T>(IObservable<T> source)
{
var last = await source.LastOrDefaultAsync();
Console.WriteLine("Collected observable and saving to the db. Last element is '{0}'", last);
// await database.SaveChangesAsync();
return source;
}
public async Task PrintAll(IObservable<double> source)
=> await source.ForEachAsync(i => Console.WriteLine("PrintAll: {0}", i));
Imagine that in my SaveObservable
function, I want to perform an operation on the database, e.g. Save all the entities added to a database context in bulk so I can get the database engine (e.g. SQL) to populate any database generated ids.
So what I want this function to do is collect the passed in observable stream, call database.SaveChangesAsync and then essentially carry on by returning the passed in stream.
However if I run the code above, The Print
function is called again. I do not want any projections/mappings of the observable called previously to run again.
How would I go about achieving this? Is it even possible?
Upvotes: 2
Views: 596
Reputation: 117027
The reason you have two subscriptions is due to these two lines:
var last = await source.LastOrDefaultAsync();
await source.ForEachAsync(i => Console.WriteLine("PrintAll: {0}", i));
Both cause a subscription to your original source observable.
The first is effectively:
var last = await
new double[] { 1, 2, 4, 8, 16, 32 }
.ToObservable()
.Select(i => Math.Pow(i, 2.0))
.Do(i => Console.WriteLine("Print: {0}", i))
.Select(i => i + 1)
.LastOrDefaultAsync();
The second is effectively:
await
new double[] { 1, 2, 4, 8, 16, 32 }
.ToObservable()
.Select(i => Math.Pow(i, 2.0))
.Do(i => Console.WriteLine("Print: {0}", i))
.Select(i => i + 1)
.Select(i => i + 2)
.ForEachAsync(i => Console.WriteLine("PrintAll: {0}", i));
The answer to this problem is to avoid any of the async
/await
code. Rx handles all the sync work far better than TPL does anyway.
You can also await
an observable without involving tasks.
Perhaps try your code this way:
async void Main()
{
var entities = new double[] { 1, 2, 4, 8, 16, 32 }.ToObservable();
entities = Square(entities);
entities = Print(entities);
entities = Add(entities, 1);
entities = SaveObservable(entities);
entities = Add(entities, 2);
await PrintAll(entities);
}
public IObservable<double> Square(IObservable<double> source)
=> source.Select(i => Math.Pow(i, 2.0));
public IObservable<double> Add(IObservable<double> source, double add)
=> source.Select(i => i + add);
public IObservable<T> Print<T>(IObservable<T> source)
=> source.Do(i => Console.WriteLine("Print: {0}", i));
public IObservable<T> SaveObservable<T>(IObservable<T> source)
{
return Observable.Create<T>(o =>
{
var last = default(T);
return
source
.Do(
t => last = t,
() =>
{
Console.WriteLine("Collected observable and saving to the db. Last element is '{0}'", last);
})
.Subscribe(o);
});
}
public IObservable<double> PrintAll(IObservable<double> source)
=> source.Do(i => Console.WriteLine("PrintAll: {0}", i));
This works and produces the following output:
Print: 1 PrintAll: 4 Print: 4 PrintAll: 7 Print: 16 PrintAll: 19 Print: 64 PrintAll: 67 Print: 256 PrintAll: 259 Print: 1024 PrintAll: 1027 Collected observable and saving to the db. Last element is '1025'
To allow the SaveObservable
to complete before continuing with the pipeline you need to add a .ToArray()
to that method. That changes the IObservable<T>
that produces zero or more values to a IObservable<T[]>
that produces one array with zero or more elements. Thus it must complete all of the values before it moves on.
Try changing SaveObservable
to:
public IObservable<T> SaveObservable<T>(IObservable<T> source)
{
return
source
.ToArray()
.Do(ts =>
{
var last = ts.Last();
Console.WriteLine("Collected observable and saving to the db. Last element is '{0}'", last);
})
.SelectMany(t => t);
}
That now outputs:
Print: 1 Print: 4 Print: 16 Print: 64 Print: 256 Print: 1024 Collected observable and saving to the db. Last element is '1025' PrintAll: 4 PrintAll: 7 PrintAll: 19 PrintAll: 67 PrintAll: 259 PrintAll: 1027
The code is now effectively this:
await
new double[] { 1, 2, 4, 8, 16, 32 }
.ToObservable()
.Select(i => Math.Pow(i, 2.0))
.Do(i => Console.WriteLine("Print: {0}", i))
.Select(i => i + 1)
.ToArray()
.Do(ts =>
{
var last = ts.Last();
Console.WriteLine("Collected observable and saving to the db. Last element is '{0}'", last);
})
.SelectMany(t => t)
.Select(i => i + 2)
.Do(i => Console.WriteLine("PrintAll: {0}", i));
Upvotes: 3
Reputation: 14350
First, your comment is spot on: this is probably best done with plain-old, pull-based programming.
As for your code you have two problems going on:
1) Observables are like item pipelines. Your code essentially sets up two pipelines:
entities -> Square -> Print -> Add(1) -> SaveObservable
entities -> Square -> Print -> Add(1) -> Add(2) -> PrintAll
This is why you see Print
called twice. If you want to see it called once, you probably want the pipelines to do something like this:
entities -> Square -> Print -> Add(1) -> SaveObservable
\-> Add(2) -> PrintAll
2) However, that's easier said then done, thanks to the mixture of async
and IObservable
. The lazy way to make a multi-casted pipeline is to slap a .Publish().RefCount()
after the Add call:
entities = Square(entities);
entities = Print(entities);
var entitiesAdded = Add(entities, 1).Publish().RefCount();
var _ = await SaveObservable(entitiesAdded);
entities = Add(entitiesAdded, 2);
await PrintAll(entities);
This doesn't work though, because Publish
makes the observable is hot, and once it is used in SaveObservable
, the items don't emit again. Another way around is .Publish()
followed by a .Connect()
when all the children are connected, but that doesn't play well with await
.
The way that will work for your code is to add .Replay().RefCount()
, but it may not perform well, because Rx has to set up an internal buffer that holds all items for the life of the subscription.
var entities = new double[] { 1, 2, 4, 8, 16, 32 }.ToObservable();
entities = Square(entities);
entities = Print(entities);
entities = Add(entities, 1).Replay().RefCount();
entities = await SaveObservable(entities);
entities = Add(entities, 2);
await PrintAll(entities);
For more information about Hot/Cold observables, Publish
, Connect
, Replay
, and RefCount
, see here.
In summary though: I suggest you use EF and TPL without Rx.
Upvotes: 3