Reputation: 1779
Say, I would like to have a stream that emits a new job 1 second after the previous job is done. The job should be emitted as soon as it's created, so the subscriber can track its progress. How could this be achieved?
var currentJob = Observable.Interval(TimeSpan.FromSeconds(1))
.Select(i => new Job(i))
// .MergeWhen(job => job.IsDone)?
Edit: I understand it's easy to do this in a TPL/Rx mashup flavor; I'm mostly curious that if there is a more Rx-native approach. Based on my experience, if I can't express a logic in a Rx-native way, most of the time it's because I'm not thinking in the Rx way correctly, so insights on how to think about this kind of workflow is also welcome.
Upvotes: 1
Views: 97
Reputation: 43484
You can create a sequence that produces just one Job
, with the Observable.Start
method, and then Repeat
this sequence. The delay between completing the one Job
and starting the other can be injected by Concat
enating a Delay
ed Empty
sequence:
IObservable<Job> jobs = Observable
.Defer(() => Observable.Start(() => new Job()))
.Concat(Observable.Empty<Job>().Delay(TimeSpan.FromSeconds(1)))
.Repeat();
The Job
constructor is invoked on a ThreadPool
thread. The purpose of the Defer
is to "cool" the Observable.Start
sequence, which is a hot sequence, so that the first Job
constructor is invoked when the jobs
sequence is subscribed, not when it is created.
Upvotes: 0
Reputation: 1558
Considering you have some kind of job request stream, which produces parameters for the job:
var jobRequestStream = Observable.Interval(TimeSpan.FromSeconds(1));
You could build the stream of jobs with a delay between them with something like this:
var jobTrackingStream = jobRequestStream.Select(i => Observable.Defer(() =>
{
var job = new Job(i);
var awaiter = Observable
.FromAsync(() => job.JobCompletionWait(TimeSpan.FromMilliseconds(100), TimeSpan.FromSeconds(1)))
.Where(_ => false); // this silly-looking where clause is to produce an empty observable that completes after Task completes
return Observable.Return(job).Concat(awaiter);
}))
.Concat()
where JobCompletionWait is an extension that does some kind of async spin wait for the job to complete:
public static class JobExtensions
{
public static async Task<Job> JobCompletionWait(this Job job, TimeSpan completionCheckInterval, TimeSpan delayCompletion)
{
while (!job.IsDone)
{
await Task.Delay(completionCheckInterval);
}
await Task.Delay(delayCompletion);
return job;
}
}
To test this I launched LinqPad and wrote a dummy Job
class:
public class Job
{
static readonly Random Rand = new Random();
public bool IsDone { get; private set; }
public long I { get; }
public Job(long i)
{
I = i;
$"{DateTime.Now:hh:mm:ss.ffff} Job {i} started".Dump();
Task.Delay(Rand.Next(2000)).ContinueWith(_ =>
{
$"{DateTime.Now:hh:mm:ss.ffff} Job {i} is done.".Dump(); ;
IsDone = true;
});
}
}
and subscribed to the jobTrackingStream
with:
jobTrackingStream
.Subscribe(job => $"{DateTime.Now:hh:mm:ss.ffff} Emited job {job.I}".Dump());
The result was:
08:01:34.8062 Job 0 started
08:01:34.8186 Emited job 0
08:01:36.3715 Job 0 is done.
08:01:37.4795 Job 1 started
08:01:37.4797 Emited job 1
08:01:37.6315 Job 1 is done.
08:01:38.7041 Job 2 started
08:01:38.7043 Emited job 2
08:01:39.7325 Job 2 is done.
08:01:40.8508 Job 3 started
08:01:40.8510 Emited job 3
08:01:42.3270 Job 3 is done.
08:01:43.4013 Job 4 started
08:01:43.4015 Emited job 4
08:01:44.2755 Job 4 is done.
08:01:45.3936 Job 5 started
08:01:45.3939 Emited job 5
08:01:45.8429 Job 5 is done.
08:01:46.9792 Job 6 started
08:01:46.9794 Emited job 6
08:01:47.7110 Job 6 is done.
....
Which seems to be the expected behaviour. Maybe there is a more clean solution but this one is just something that came to my mind.
Edit: I found task-less solution, without subjects:
var jobTrackingStream = jobRequestStream.Select(i => Observable.Defer(() =>
Observable.Generate(
new Job(i),
job => !job.IsDone,
job => job,
job => job,
_ => TimeSpan.FromMilliseconds(100)
)
.Distinct()
.Concat(Observable.Delay(Observable.Empty<Job>(), TimeSpan.FromSeconds(1)))
))
.Concat();
Upvotes: 1