Reputation: 87
I have a producer which downloads data in pages from a Rest API and several consumers that process the pages (e.g. load them to a database).
I would like to have producer and consumers working in parallel, meaning that producer should not wait for a page to be consumed before downloading the next one. Each consumer needs to process pages sequentially.
When all pages are downloaded, the main thread should wait for all consumers to complete their work (as consuming may take longer than producing).
My current approach is as follows:
I have created an observable that downloads the pages, which starts as soon as consumer subscribers are attached. I configured the subscribers to observe on their own threads for parallel execution.
Code in C#:
IEnumerable<Page> getPages = produce();
var observable = getPages.ToObservable().Publish();
observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(page => consume1(page));
observable
.ObserveOn(NewThreadScheduler.Default)
.Subscribe(page => consume2(page));
observable.Connect();
The problem with this implementation is that the main thread may finish before all pages are processed and the application stops.
How can I achieve this using RX?
Thanks!
Edit:
Tried also the following approach (from an answer):
static void Main(string[] args)
{
var getPages = Enumerable.Range(0, 10);
var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();
var observable =
getPages
.ToObservable()
.Publish(ps =>
Observable
.Merge(
ps.Select(p => Observable.Start(() => consume1(p), els1)),
ps.Select(p => Observable.Start(() => consume2(p), els2))));
observable.Wait();
}
public static void consume1(int p)
{
Console.WriteLine($"1:{p}");
Thread.Sleep(200);
}
public static void consume2(int p)
{
Console.WriteLine($"2:{p}");
Thread.Sleep(100);
}
observable.Wait() returns as soon as underlying enumerable finishes yielding values. The output is:
1:0
2:0
Just to prove, if we replace getPages with:
var getPages = Enumerable.Range(0, 10)
.Select(i =>
{
Console.WriteLine($"Produced {i}");
Thread.Sleep(30);
return i;
});
then the output is:
Produced 0
Produced 1
1:0
2:0
Produced 2
Produced 3
Produced 4
2:1
Produced 5
Produced 6
Produced 7
1:1
2:2
Produced 8
Produced 9
Upvotes: 1
Views: 1804
Reputation: 117064
I think this does what you want:
var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();
var observable =
getPages
.ToObservable()
.Publish(ps =>
Observable
.Merge(
ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));
I wrote this test code:
var getPages = Enumerable.Range(0, 10);
var els1 = new EventLoopScheduler();
var els2 = new EventLoopScheduler();
var observable =
getPages
.ToObservable()
.Publish(ps =>
Observable
.Merge(
ps.SelectMany(p => Observable.Start(() => consume1(p), els1)),
ps.SelectMany(p => Observable.Start(() => consume2(p), els2))));
observable.Wait();
public void consume1(int p)
{
Console.WriteLine($"1:{p}");
Thread.Sleep(200);
}
public void consume2(int p)
{
Console.WriteLine($"2:{p}");
Thread.Sleep(100);
}
I got this output:
1:0 2:0 2:1 1:1 2:2 2:3 1:2 2:4 2:5 1:3 2:6 2:7 1:4 2:8 2:9 1:5 1:6 1:7 1:8 1:9
When you've finished with the EventLoopScheduler
instances you should call .Dispose()
on them to close the threads.
Upvotes: 2