Reputation: 640
I want to merge data (from HTTP msg.payload) from 3 different sources.
However, these HTTP requests may be called multiple times so data from the same source can be received more than once.
[{"id":"7ed13b41.131b14","type":"join","z":"246eac57.42ec74","name":"","mode":"auto","build":"string","property":"payload","propertyType":"msg","key":"index","joiner":"","joinerType":"str","accumulate":false,"timeout":"","count":"3","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":1370,"y":1160,"wires":[["d941ca6e.0e1aa8"]]},{"id":"d941ca6e.0e1aa8","type":"debug","z":"246eac57.42ec74","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":1490,"y":1120,"wires":[]},{"id":"b04a312.d6c40d","type":"function","z":"246eac57.42ec74","name":"part 1","func":"msg.parts = {};\nmsg.parts.id = 12345;\nmsg.parts.index = 0;\nmsg.parts.count = 3;\nreturn msg;","outputs":1,"noerr":0,"x":1210,"y":1120,"wires":[["7ed13b41.131b14"]]},{"id":"30cec12.e2fc13e","type":"function","z":"246eac57.42ec74","name":"part 2","func":"msg.parts = {};\nmsg.parts.id = 12345;\nmsg.parts.index = 1;\nmsg.parts.count = 3;\nreturn msg;","outputs":1,"noerr":0,"x":1210,"y":1160,"wires":[["7ed13b41.131b14"]]},{"id":"8902f2d5.ea688","type":"function","z":"246eac57.42ec74","name":"part 3","func":"msg.parts = {};\nmsg.parts.id = 12345;\nmsg.parts.index = 2;\nmsg.parts.count = 3;\nreturn msg;","outputs":1,"noerr":0,"x":1210,"y":1200,"wires":[["7ed13b41.131b14"]]},{"id":"814f25b6.dd3958","type":"http in","z":"246eac57.42ec74","name":"source 1","url":"/source1","method":"get","upload":false,"swaggerDoc":"","x":1060,"y":1120,"wires":[["b04a312.d6c40d"]]},{"id":"cab634ac.5d9df8","type":"http in","z":"246eac57.42ec74","name":"source 2","url":"/source 2","method":"get","upload":false,"swaggerDoc":"","x":1060,"y":1160,"wires":[["30cec12.e2fc13e"]]},{"id":"98f89b04.9b5bb8","type":"http in","z":"246eac57.42ec74","name":"source 3","url":"/source3","method":"get","upload":false,"swaggerDoc":"","x":1060,"y":1200,"wires":[["8902f2d5.ea688"]]}]
What happens in this flow is when Join Node receives 3 messages from source 1, it considers the msg.parts
complete. The behavior I want to achieve is that only when data from the 3 sources are received will the flow proceed. And if data is received from the same source, it will only overwrite the previous data.
Is there a way to achieve this within Node-RED?
Upvotes: 3
Views: 20120
Reputation: 1111
One way is to use a different msg.topic
for each source, and set up your join
node to manually combine all inputs into one object keyed on the topic:
[{"id":"7f9b1fbb.58ff","type":"join","z":"f9a2eec9.c2e26","name":"","mode":"custom","build":"object","property":"payload","propertyType":"msg","key":"topic","joiner":"","joinerType":"str","accumulate":true,"timeout":"","count":"3","reduceRight":false,"reduceExp":"","reduceInit":"","reduceInitType":"","reduceFixup":"","x":770,"y":2040,"wires":[["42ac1f9e.eac62"]]},{"id":"42ac1f9e.eac62","type":"debug","z":"f9a2eec9.c2e26","name":"","active":true,"tosidebar":true,"console":false,"tostatus":false,"complete":"true","x":930,"y":2040,"wires":[]},{"id":"37be742c.72a96c","type":"http in","z":"f9a2eec9.c2e26","name":"source 1","url":"/source1/:value","method":"get","upload":false,"swaggerDoc":"","x":180,"y":2020,"wires":[["8ca164f9.828898"]]},{"id":"51684d03.091ea4","type":"http in","z":"f9a2eec9.c2e26","name":"source 2","url":"/source2/:value","method":"get","upload":false,"swaggerDoc":"","x":180,"y":2060,"wires":[["2bb81dc1.351562"]]},{"id":"1a7ad806.dbb598","type":"http in","z":"f9a2eec9.c2e26","name":"source 3","url":"/source3/:value","method":"get","upload":false,"swaggerDoc":"","x":180,"y":2100,"wires":[["c7da5f3a.95b71"]]},{"id":"9a85c694.2a37a8","type":"http response","z":"f9a2eec9.c2e26","name":"","statusCode":"204","headers":{},"x":780,"y":2080,"wires":[]},{"id":"2a0579a6.612c66","type":"change","z":"f9a2eec9.c2e26","name":"set payload value","rules":[{"t":"set","p":"payload","pt":"msg","to":"req.params.value","tot":"msg"}],"action":"","property":"","from":"","to":"","reg":false,"x":570,"y":2060,"wires":[["9a85c694.2a37a8","7f9b1fbb.58ff"]]},{"id":"8ca164f9.828898","type":"change","z":"f9a2eec9.c2e26","name":"set topic 1","rules":[{"t":"set","p":"topic","pt":"msg","to":"source1","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":350,"y":2020,"wires":[["2a0579a6.612c66"]]},{"id":"2bb81dc1.351562","type":"change","z":"f9a2eec9.c2e26","name":"set topic 2","rules":[{"t":"set","p":"topic","pt":"msg","to":"source2","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":350,"y":2060,"wires":[["2a0579a6.612c66"]]},{"id":"c7da5f3a.95b71","type":"change","z":"f9a2eec9.c2e26","name":"set topic 3","rules":[{"t":"set","p":"topic","pt":"msg","to":"source3","tot":"str"}],"action":"","property":"","from":"","to":"","reg":false,"x":350,"y":2100,"wires":[["2a0579a6.612c66"]]}]
As mentioned in the comments, you need to return something from all of your flows that begin with http in
nodes... normally, I like to use http POST endpoints (for receiving input values) and return status code 204 (no content) to let the caller know that the value was accepted. Since your original flow uses GET nodes, I parameterized the URLs (e.g /source1/:value
) and used the change
node to copy the value to the payload. Your mileage may vary...
Note that I set the "number of message parts" to 3, meaning that you will get no output from the join
node until it receives 3 different topic values. After that, every input msg will output the latest combined msg object (because of the "every subsequent message" checkbox). By tweaking those two options, you can get whatever behavior you need.
Upvotes: 7