hillin
hillin

Reputation: 1779

Emit the next value only if the previous value has completed something?

Say, I would like to have a stream that emits a new job 1 second after the previous job is done. The job should be emitted as soon as it's created, so the subscriber can track its progress. How could this be achieved?

var currentJob = Observable.Interval(TimeSpan.FromSeconds(1))
    .Select(i => new Job(i))
    // .MergeWhen(job => job.IsDone)?

Edit: I understand it's easy to do this in a TPL/Rx mashup flavor; I'm mostly curious that if there is a more Rx-native approach. Based on my experience, if I can't express a logic in a Rx-native way, most of the time it's because I'm not thinking in the Rx way correctly, so insights on how to think about this kind of workflow is also welcome.

Upvotes: 1

Views: 97

Answers (2)

Theodor Zoulias
Theodor Zoulias

Reputation: 43484

You can create a sequence that produces just one Job, with the Observable.Start method, and then Repeat this sequence. The delay between completing the one Job and starting the other can be injected by Concatenating a Delayed Empty sequence:

IObservable<Job> jobs = Observable
    .Defer(() => Observable.Start(() => new Job()))
    .Concat(Observable.Empty<Job>().Delay(TimeSpan.FromSeconds(1)))
    .Repeat();

The Job constructor is invoked on a ThreadPool thread. The purpose of the Defer is to "cool" the Observable.Start sequence, which is a hot sequence, so that the first Job constructor is invoked when the jobs sequence is subscribed, not when it is created.

Upvotes: 0

Maku
Maku

Reputation: 1558

Considering you have some kind of job request stream, which produces parameters for the job:

var jobRequestStream = Observable.Interval(TimeSpan.FromSeconds(1));

You could build the stream of jobs with a delay between them with something like this:

var jobTrackingStream = jobRequestStream.Select(i => Observable.Defer(() =>
{
    var job = new Job(i);

    var awaiter = Observable
        .FromAsync(() => job.JobCompletionWait(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)))
        .Where(_ => false); // this silly-looking where clause is to produce an empty observable that completes after Task completes
    return Observable.Return(job).Concat(awaiter);
}))
.Concat()

where JobCompletionWait is an extension that does some kind of async spin wait for the job to complete:

public static class JobExtensions
{
    public static async Task<Job> JobCompletionWait(this Job job, TimeSpan completionCheckInterval, TimeSpan delayCompletion)
    {
        while (!job.IsDone)
        {
            await Task.Delay(completionCheckInterval);
        }

        await Task.Delay(delayCompletion);
        return job;
    }
}

To test this I launched LinqPad and wrote a dummy Job class:

public class Job
{
    static readonly Random Rand = new Random();

    public bool IsDone { get; private set; }

    public long I { get; }

    public Job(long i)
    {
        I = i;

        $"{DateTime.Now:hh:mm:ss.ffff} Job {i} started".Dump();
        Task.Delay(Rand.Next(2000)).ContinueWith(_ =>
        {
            $"{DateTime.Now:hh:mm:ss.ffff} Job {i} is done.".Dump(); ;
            IsDone = true;
        });
    }
}

and subscribed to the jobTrackingStream with:

jobTrackingStream
    .Subscribe(job => $"{DateTime.Now:hh:mm:ss.ffff} Emited job {job.I}".Dump());

The result was:

08:01:34.8062 Job 0 started
08:01:34.8186 Emited job 0
08:01:36.3715 Job 0 is done.
08:01:37.4795 Job 1 started
08:01:37.4797 Emited job 1
08:01:37.6315 Job 1 is done.
08:01:38.7041 Job 2 started
08:01:38.7043 Emited job 2
08:01:39.7325 Job 2 is done.
08:01:40.8508 Job 3 started
08:01:40.8510 Emited job 3
08:01:42.3270 Job 3 is done.
08:01:43.4013 Job 4 started
08:01:43.4015 Emited job 4
08:01:44.2755 Job 4 is done.
08:01:45.3936 Job 5 started
08:01:45.3939 Emited job 5
08:01:45.8429 Job 5 is done.
08:01:46.9792 Job 6 started
08:01:46.9794 Emited job 6
08:01:47.7110 Job 6 is done.

....

Which seems to be the expected behaviour. Maybe there is a more clean solution but this one is just something that came to my mind.

Edit: I found task-less solution, without subjects:

var jobTrackingStream = jobRequestStream.Select(i => Observable.Defer(() =>
    Observable.Generate(
            new Job(i),
            job => !job.IsDone,
            job => job,
            job => job,
            _ => TimeSpan.FromMilliseconds(100)
        )
        .Distinct()
        .Concat(Observable.Delay(Observable.Empty<Job>(), TimeSpan.FromSeconds(1)))
))
.Concat();

Upvotes: 1

Related Questions