libzz
libzz

Reputation: 640

How to merge data from multiple payloads in Node-RED?

I want to merge data (from HTTP msg.payload) from 3 different sources.

However, these HTTP requests may be called multiple times so data from the same source can be received more than once.

enter image description here

[{"id":"7ed13b41.131b14","type":"join","z":"246eac57.42ec74","name":"","mode":"auto","build":"string","property":"payload","propertyType":"msg","key":"index","joiner":"","joinerType":"str","accumulate":false,"timeout":"","count":"3","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1370,"y":1160,"wires":[["d941ca6e.0e1aa8"]]},{"id":"d941ca6e.0e1aa8","type":"debug","z":"246eac57.42ec74","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":1490,"y":1120,"wires":[]},{"id":"b04a312.d6c40d","type":"function","z":"246eac57.42ec74","name":"part 1","func":"msg.parts = {};\nmsg.parts.id = 12345;\nmsg.parts.index = 0;\nmsg.parts.count = 3;\nreturn msg;","outputs":1,"noerr":0,"x":1210,"y":1120,"wires":[["7ed13b41.131b14"]]},{"id":"30cec12.e2fc13e","type":"function","z":"246eac57.42ec74","name":"part 2","func":"msg.parts = {};\nmsg.parts.id = 12345;\nmsg.parts.index = 1;\nmsg.parts.count = 3;\nreturn msg;","outputs":1,"noerr":0,"x":1210,"y":1160,"wires":[["7ed13b41.131b14"]]},{"id":"8902f2d5.ea688","type":"function","z":"246eac57.42ec74","name":"part 3","func":"msg.parts = {};\nmsg.parts.id = 12345;\nmsg.parts.index = 2;\nmsg.parts.count = 3;\nreturn msg;","outputs":1,"noerr":0,"x":1210,"y":1200,"wires":[["7ed13b41.131b14"]]},{"id":"814f25b6.dd3958","type":"http in","z":"246eac57.42ec74","name":"source 1","url":"/source1","method":"get","upload":false,"swaggerDoc":"","x":1060,"y":1120,"wires":[["b04a312.d6c40d"]]},{"id":"cab634ac.5d9df8","type":"http in","z":"246eac57.42ec74","name":"source 2","url":"/source 2","method":"get","upload":false,"swaggerDoc":"","x":1060,"y":1160,"wires":[["30cec12.e2fc13e"]]},{"id":"98f89b04.9b5bb8","type":"http in","z":"246eac57.42ec74","name":"source 3","url":"/source3","method":"get","upload":false,"swaggerDoc":"","x":1060,"y":1200,"wires":[["8902f2d5.ea688"]]}]

What happens in this flow is when Join Node receives 3 messages from source 1, it considers the msg.parts complete. The behavior I want to achieve is that only when data from the 3 sources are received will the flow proceed. And if data is received from the same source, it will only overwrite the previous data.

Is there a way to achieve this within Node-RED?

Upvotes: 3

Views: 20120

Answers (1)

SteveR
SteveR

Reputation: 1111

One way is to use a different msg.topic for each source, and set up your join node to manually combine all inputs into one object keyed on the topic:

Join node manual mode

[{"id":"7f9b1fbb.58ff","type":"join","z":"f9a2eec9.c2e26","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"","joinerType":"str","accumulate":true,"timeout":"","count":"3","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":770,"y":2040,"wires":[["42ac1f9e.eac62"]]},{"id":"42ac1f9e.eac62","type":"debug","z":"f9a2eec9.c2e26","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":930,"y":2040,"wires":[]},{"id":"37be742c.72a96c","type":"http in","z":"f9a2eec9.c2e26","name":"source 1","url":"/source1/:value","method":"get","upload":false,"swaggerDoc":"","x":180,"y":2020,"wires":[["8ca164f9.828898"]]},{"id":"51684d03.091ea4","type":"http in","z":"f9a2eec9.c2e26","name":"source 2","url":"/source2/:value","method":"get","upload":false,"swaggerDoc":"","x":180,"y":2060,"wires":[["2bb81dc1.351562"]]},{"id":"1a7ad806.dbb598","type":"http in","z":"f9a2eec9.c2e26","name":"source 3","url":"/source3/:value","method":"get","upload":false,"swaggerDoc":"","x":180,"y":2100,"wires":[["c7da5f3a.95b71"]]},{"id":"9a85c694.2a37a8","type":"http response","z":"f9a2eec9.c2e26","name":"","statusCode":"204","headers":{},"x":780,"y":2080,"wires":[]},{"id":"2a0579a6.612c66","type":"change","z":"f9a2eec9.c2e26","name":"set payload value","rules":[{"t":"set","p":"payload","pt":"msg","to":"req.params.value","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":570,"y":2060,"wires":[["9a85c694.2a37a8","7f9b1fbb.58ff"]]},{"id":"8ca164f9.828898","type":"change","z":"f9a2eec9.c2e26","name":"set topic 1","rules":[{"t":"set","p":"topic","pt":"msg","to":"source1","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":350,"y":2020,"wires":[["2a0579a6.612c66"]]},{"id":"2bb81dc1.351562","type":"change","z":"f9a2eec9.c2e26","name":"set topic 2","rules":[{"t":"set","p":"topic","pt":"msg","to":"source2","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":350,"y":2060,"wires":[["2a0579a6.612c66"]]},{"id":"c7da5f3a.95b71","type":"change","z":"f9a2eec9.c2e26","name":"set topic 3","rules":[{"t":"set","p":"topic","pt":"msg","to":"source3","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":350,"y":2100,"wires":[["2a0579a6.612c66"]]}]

As mentioned in the comments, you need to return something from all of your flows that begin with http in nodes... normally, I like to use http POST endpoints (for receiving input values) and return status code 204 (no content) to let the caller know that the value was accepted. Since your original flow uses GET nodes, I parameterized the URLs (e.g /source1/:value) and used the change node to copy the value to the payload. Your mileage may vary...

Note that I set the "number of message parts" to 3, meaning that you will get no output from the join node until it receives 3 different topic values. After that, every input msg will output the latest combined msg object (because of the "every subsequent message" checkbox). By tweaking those two options, you can get whatever behavior you need.

Upvotes: 7

Related Questions