Reputation: 1711
The problem I want to solve is, that I want to kick of an asynchronous action from multiple threads but only one action execution at a time is permitted.
The action is to communicate with a hardware device and the device can only handle one request at a time.
One of my ideas was to synchronize this with a System.Reactive.Subjects.Subject
. Multiple threads could call OnNext
and the subject should have executed one request after the other. I wrote this (probably very naive) code:
static void Main(string[] args)
{
var source = new System.Reactive.Subjects.Subject<Func<Task<int>>>();
source
// from http://code.fitness/post/2016/11/rx-selectmany-deep-dive.html
.SelectMany(async x => await x.Invoke())
.Subscribe(result => Console.WriteLine($"Work of index {result} completed"));
var noOfThreads = 3;
for (var i = 0; i < noOfThreads; i++)
{
var i1 = i;
var t = new Thread(() => source.OnNext(() => doWork(i1)));
t.Start();
}
Console.ReadKey();
}
static async Task<int> doWork(int index)
{
Console.WriteLine($"Start work {index}");
await Task.Delay(1000);
Console.WriteLine($"Stop work {index}");
return index;
}
My hope was an output like this:
Start work 2
Stop work 2
Work of index 2 completed
Start work 0
Stop work 0
Work of index 0 completed
Instead I get:
Start work 0
Start work 1
Start work 2
Stop work 1
Stop work 0
Work of index 1 completed
Work of index 0 completed
Stop work 2
Work of index 2 completed
This shows that all actions are started right from the start and there is no waiting for the other ones to complete. I wonder if Reactive
is the right way to do that, or if there is some else clever way I can accomplish my task.
Edit: to give more background information why I need this: The application communicates with a device. This device has a serial interface and can only handle one command at a time. So I have a thread that constantly gets status updates like:
while (true)
{
ReadPosition();
ReadTempereatures();
ReadErrors();
}
Then there is a ui, where users can initiate some action on the device. My current solution is a queue where I enqueue my commands. That works but I was wondering, if a event approach would be working as well.
Upvotes: 0
Views: 263
Reputation: 117185
You're mixing Rx, Tasks, and threading. No wonder it's going off the rails. Pick one approach - Rx is the best, IMHO - and you should be fine.
Does this suffice:
static void Main(string[] args)
{
var source = new Subject<Func<int>>();
source
.Synchronize()
.Select(x => x())
.Subscribe(result => Console.WriteLine($"Work of index {result} completed"));
var noOfThreads = 3;
for (var i = 0; i < noOfThreads; i++)
{
var i1 = i;
var t = new Thread(() => source.OnNext(() => doWork(i1)));
t.Start();
}
Console.ReadLine();
}
static int doWork(int index)
{
Console.WriteLine($"Start work {index}");
Thread.Sleep(1000);
Console.WriteLine($"Stop work {index}");
return index;
}
That gives:
Start work 0 Stop work 0 Work of index 0 completed Start work 2 Stop work 2 Work of index 2 completed Start work 1 Stop work 1 Work of index 1 completed
The key is to call .Synchronize()
to bring all of the calling threads under the Rx contract.
Upvotes: 4