resp78
resp78

Reputation: 1534

How to sequentially run Tasks and put the result in IObservable

In my ViewModel I would like to do couple of things and would like to update the status as things are done.

Kent Boogaart's book suggested to use Observable to show the progress. I am now stuck at how to do multiple pieces of work, say in Tasks, and tick the observable. I attempted the Concat operator but all the Tasks get started immediately and not one after another.

public class StartupViewModel : ReactiveObject
{
    public string Status { [ObservableAsProperty]get; }

    public ReactiveCommand<Unit,string> LoadedCommand { get; set; }
    public StartupViewModel()
    {
        var progress = Observable.Concat(
            Task.Run(() =>
            {
                Thread.Sleep(3000);
                return "hello";
            }).ToObservable(),
            Task.Run(() =>
            {
                Thread.Sleep(3000);
                return "cip";
            }).ToObservable(),
            Task.Run(() =>
            {
                Thread.Sleep(3000);
                return "2040";
            }).ToObservable()
        );

        LoadedCommand = ReactiveCommand.CreateFromObservable(() => progress);
        LoadedCommand
            .ToPropertyEx(this, x => x.Status, "Starting up, please be patient");
    }
}

Upvotes: 1

Views: 281

Answers (2)

James World
James World

Reputation: 29776

The problem is that you are starting all of the tasks inside the construction of Concat. Concat is doing the right thing and grabbing the result from all the completed tasks. What you want to do is wrap the tasks in Obserable.Defer so they aren't started all at once, but only as they are subscribed to by the Concat operator:

var progress = Observable.Concat(
  Observable.Defer (() => Task.Run(() => { Thread.Sleep(5000); return 1; }).ToObservable()),
  Observable.Defer (() => Task.Run(() => { Thread.Sleep(5000); return 2; }).ToObservable())
);

Upvotes: 2

Glenn Watson
Glenn Watson

Reputation: 2888

Potentially you could something like

return Observable.Create<string>(async (obs, cancellationToken) =>
{
   obs.OnNext(await Task1(cancellationToken));
   obs.OnNext(await Task2(cancellationToken));
   obs.OnNext(await Task3(cancellationToken));
   obs.OnCompleted();
});

Upvotes: 1

Related Questions