Reputation: 23
I use Node.js. I have an MQTT message event handler
index.js
client.on('message', function (topic, message) {
// calls another function
my_function(topic,message);
})
which calls another function my_function
on receiving messages.
async function my_function(topic,message) {
const value = await dataFromPLC();
///processes the value together with message
}
The function dataFromPLC
exported from another file using exports.dataFromPLC = dataFromPLC
and imported into my main function looks like this
PLCfunctions.js
let client = new S7Client(plcSettings);
client.on('error', console.error);
async function dataFromPLC (){
try {
await client.connect();
} catch (err){
console.error(err);
}
try {
// Read DB
const res = await client.readDB(dbNr, dbVars);
return res;
} catch (err) {
console.error(err);
} finally {
client.disconnect();
}
}
There is no issue when I receive a single MQTT message or there is sufficient delay between messages. However when I receive two MQTT messages, both of them calls my_function
and subsequently dataFromPLC
without much delay in between. I receive an error as there is not sufficient time for the PLC connection to close before the second message tries to use the connection again. I have looked at different options and am not quite sure about how to solve the problem. Can I get some help please?
Upvotes: 2
Views: 519
Reputation: 215049
You'll have to set up a queue of messages, so that onMessage
only places the input in the queue and defers its processing until later on. For example, you could make the queue a Promise with then
as enqueue operation. This way it's guaranteed that no processing starts until all previous ones are completed.
Here's a small demo, a click on the button simulates an incoming message:
let QUEUE = Promise.resolve()
function onMessage(msg) {
console.log('GOT MESSAGE', msg)
QUEUE = QUEUE.then(() => process(msg))
}
let pause = n => new Promise(r => setTimeout(r, n));
async function process(msg) {
console.log('BEGIN', msg)
await pause(200); console.log('busy', msg)
await pause(200); console.log('busy', msg)
await pause(200); console.log('busy', msg)
await pause(200); console.log('busy', msg)
await pause(200); console.log('busy', msg)
console.log('END', msg)
}
msg = 0
document.querySelector('button').addEventListener('click', () => onMessage(++msg))
<button>message</button>
Upvotes: 4