Justin Lang
Justin Lang

Reputation: 591

Multiple publishers sending concurrent messages to a single subscriber in Retlang?

I need to build an application where some number of instances of an object are generating "pulses", concurrently. (Essentially this just means that they are incrementing a counter.) I also need to track the total counters for each object. Also, whenever I perform a read on a counter, it needs to be reset to zero.

So I was talking to a guy at work, and he mentioned Retlang and message-based concurrency, which sounded super interesting. But obviously I am very new to the concept. So I've built a small prototype, and I get the expected results, which is awesome - but I'm not sure if I've potentially made some logical errors and left the software open to bugs, due to my inexperience with Retlang and concurrent programming in general.

First off, I have these classes:

public class Plc {

        private readonly IChannel<Pulse> _channel;
        private readonly IFiber _fiber;
        private readonly int _pulseInterval;
        private readonly int _plcId;

        public Plc(IChannel<Pulse> channel, int plcId, int pulseInterval) {
            _channel = channel;
            _pulseInterval = pulseInterval;
            _fiber = new PoolFiber();
            _plcId = plcId;
        }

        public void Start() {
            _fiber.Start();
            // Not sure if it's safe to pass in a delegate which will run in an infinite loop...
            // AND use a shared channel object...
            _fiber.Enqueue(() => {
                SendPulse();
            });
        }

        private void SendPulse() {
            while (true) {
                // Not sure if it's safe to use the same channel object in different
                // IFibers...
                _channel.Publish(new Pulse() { PlcId = _plcId });
                Thread.Sleep(_pulseInterval);
            }
        }
    }

    public class Pulse {
        public int PlcId { get; set; }
    }

The idea here is that I can instantiate multiple Plcs, pass each one the same IChannel, and then have them execute the SendPulse function concurrently, which would allow each one to publish to the same channel. But as you can see from my comments, I'm a little skeptical that what I'm doing is actually legit. I'm mostly worried about using the same IChannel object to Publish in the context of different IFibers, but I'm also worried about never returning from the delegate that was passed to Enqueue. I'm hoping some one can provide some insight as to how I should be handling this.

Also, here is the "subscriber" class:

public class PulseReceiver {

        private int[] _pulseTotals;
        private readonly IFiber _fiber;
        private readonly IChannel<Pulse> _channel;
        private object _pulseTotalsLock;

        public PulseReceiver(IChannel<Pulse> channel, int numberOfPlcs) {
            _pulseTotals = new int[numberOfPlcs];
            _channel = channel;
            _fiber = new PoolFiber();
            _pulseTotalsLock = new object();
        }

        public void Start() {
            _fiber.Start();
            _channel.Subscribe(_fiber, this.UpdatePulseTotals);
        }

        private void UpdatePulseTotals(Pulse pulse) {
            // This occurs in the execution context of the IFiber.
            // If we were just dealing with the the published Pulses from the channel, I think
            // we wouldn't need the lock, since I THINK the published messages would be taken
            // from a queue (i.e. each Plc is publishing concurrently, but Retlang enqueues
            // the messages).
            lock(_pulseTotalsLock) {

                _pulseTotals[pulse.PlcId - 1]++;
            }
        }

        public int GetTotalForPlc(int plcId) {
            // However, this access takes place in the application thread, not in the IFiber,
            // and I think there could potentially be a race condition here.  I.e. the array
            // is being updated from the IFiber, but I think I'm reading from it and resetting values
            // concurrently in a different thread.
            lock(_pulseTotalsLock) {

                if (plcId <= _pulseTotals.Length) {

                    int currentTotal = _pulseTotals[plcId - 1];
                    _pulseTotals[plcId - 1] = 0;
                    return currentTotal;
                }
            }

            return -1;
        }
    }

So here, I am reusing the same IChannel that was given to the Plc instances, but having a different IFiber subscribe to it. Ideally then I could receive the messages from each Plc, and update a single private field within my class, but in a thread safe way.

From what I understand (and I mentioned in my comments), I think that I would be safe to simply update the _pulseTotals array in the delegate which I gave to the Subscribe function, because I would receive each message from the Plcs serially.

However, I'm not sure how best to handle the bit where I need to read the totals and reset them. As you can see from the code and comments, I ended up wrapping a lock around any access to the _pulseTotals array. But I'm not sure if this is necessary, and I would love to know a) if it is in fact necessary to do this, and why, or b) the correct way to implement something similar.

And finally for good measure, here's my main function:

static void Main(string[] args) {

        Channel<Pulse> pulseChannel = new Channel<Pulse>();

        PulseReceiver pulseReceiver = new PulseReceiver(pulseChannel, 3);
        pulseReceiver.Start();

        List<Plc> plcs = new List<Plc>() {
            new Plc(pulseChannel, 1, 500),
            new Plc(pulseChannel, 2, 250),
            new Plc(pulseChannel, 3, 1000)
        };

        plcs.ForEach(plc => plc.Start());

        while (true) {
            Thread.Sleep(10000);
            Console.WriteLine(string.Format("Plc 1: {0}\nPlc 2: {1}\nPlc 3: {2}\n", pulseReceiver.GetTotalForPlc(1), pulseReceiver.GetTotalForPlc(2), pulseReceiver.GetTotalForPlc(3)));
        }
    }

I instantiate one single IChannel, pass it to everything, where internally the Receiver subscribes with an IFiber, and where the Plcs use IFibers to "enqueue" a non-returning method which continually publishes to the channel.

Again, the console output looks exactly like I would expect it to look, i.e. I see 20 "pulses" for Plc 1 after waiting 10 seconds. And the resetting of the counters after a read also seems to work, i.e. Plc 1 has 20 "pulses" after each 10 second increment. But that doesn't reassure me that I haven't overlooked something important.

I'm really excited to learn a bit more about Retlang and concurrent programming techniques, so hopefuly someone has the time to sift through my code and offer some suggestions for my specific concerns, or else even a different design based on my requirements!

Upvotes: 1

Views: 451

Answers (0)

Related Questions