ChristianMurschall
ChristianMurschall

Reputation: 1711

How do I execute functions in subject synchronously?

The problem I want to solve is, that I want to kick of an asynchronous action from multiple threads but only one action execution at a time is permitted.

The action is to communicate with a hardware device and the device can only handle one request at a time.

One of my ideas was to synchronize this with a System.Reactive.Subjects.Subject. Multiple threads could call OnNext and the subject should have executed one request after the other. I wrote this (probably very naive) code:

static void Main(string[] args)
{
    var source = new System.Reactive.Subjects.Subject<Func<Task<int>>>();

    source
        // from http://code.fitness/post/2016/11/rx-selectmany-deep-dive.html
        .SelectMany(async x => await x.Invoke()) 
        .Subscribe(result => Console.WriteLine($"Work of index {result} completed"));

    var noOfThreads = 3;
    for (var i = 0; i < noOfThreads; i++)
    {
        var i1 = i;
        var t = new Thread(() => source.OnNext(() => doWork(i1)));
        t.Start();
    }

    Console.ReadKey();
}

static async Task<int> doWork(int index)
{
    Console.WriteLine($"Start work {index}");
    await Task.Delay(1000);
    Console.WriteLine($"Stop work {index}");
    return index;
}

My hope was an output like this:

Start work 2
Stop work 2
Work of index 2 completed
Start work 0
Stop work 0
Work of index 0 completed

Instead I get:

Start work 0
Start work 1
Start work 2
Stop work 1
Stop work 0
Work of index 1 completed
Work of index 0 completed
Stop work 2
Work of index 2 completed

This shows that all actions are started right from the start and there is no waiting for the other ones to complete. I wonder if Reactive is the right way to do that, or if there is some else clever way I can accomplish my task.

Edit: to give more background information why I need this: The application communicates with a device. This device has a serial interface and can only handle one command at a time. So I have a thread that constantly gets status updates like:

while (true)
{
    ReadPosition();
    ReadTempereatures();
    ReadErrors();
}

Then there is a ui, where users can initiate some action on the device. My current solution is a queue where I enqueue my commands. That works but I was wondering, if a event approach would be working as well.

Upvotes: 0

Views: 263

Answers (1)

Enigmativity
Enigmativity

Reputation: 117185

You're mixing Rx, Tasks, and threading. No wonder it's going off the rails. Pick one approach - Rx is the best, IMHO - and you should be fine.

Does this suffice:

static void Main(string[] args)
{
    var source = new Subject<Func<int>>();

    source
        .Synchronize()
        .Select(x => x())
        .Subscribe(result => Console.WriteLine($"Work of index {result} completed"));

    var noOfThreads = 3;
    for (var i = 0; i < noOfThreads; i++)
    {
        var i1 = i;
        var t = new Thread(() => source.OnNext(() => doWork(i1)));
        t.Start();
    }

    Console.ReadLine();
}

static int doWork(int index)
{
    Console.WriteLine($"Start work {index}");
    Thread.Sleep(1000);
    Console.WriteLine($"Stop work {index}");
    return index;
}

That gives:

Start work 0
Stop work 0
Work of index 0 completed
Start work 2
Stop work 2
Work of index 2 completed
Start work 1
Stop work 1
Work of index 1 completed

The key is to call .Synchronize() to bring all of the calling threads under the Rx contract.

Upvotes: 4

Related Questions