Reputation: 43
I am gathering data from an external source (Bluetooth Low Energy) using NodeJS (noble module). I am streaming them to a JSON array in an external file. Let's call it fromSource.json. It looks like this:
fromSource.json
[
{
"id": 1,
"name": "foo"
"value": 123
},
{
"id": 2,
"name": "foo2",
"value": 123
},
{
"id": 3,
"name": "foo3",
"value": 789
}
]
In the other hand, I am going to process these objects in real-time and store the new values in a CSV file. Let's call it toDestination.csv. It looks like this:
toDestination.csv
id,name,convertedValue
1,"foo",123000
2,"foo2",456000
3,"foo3",789000
Every one second, I am going to receive a new value (a new json object) from the source, push it into a writeable stream (the json array file) then read it into a readable stream, do a transformation, and write it again into its final destination, the csv file.
My questions are: Are NodeJS streams adapted to handle JSON objects ? Should I stringify them before using them ? Should I use a Duplex or a Transform stream in my case ?
Upvotes: 1
Views: 1360
Reputation: 2658
Based on your question I do think Transform is all you need. In Duplex you'd need to implement both reading and writing which is not necessary in your case.
The code would look like this:
const intermediate = measurementStream.pipe(
new Transform({transform: initialTransforms})
);
intermediate.pipe(someArrayifyStreamToJSON)
intermediate.pipe(
new Transform({transform: someMoreTransforms})
).pipe(
new Transform({transform: stringifyToCSV})
).pipe(
fs.createWriteStream('some_path.csv')
);
I would also recommend to take a look at the framework I've created and support scramjet. It is meant to deal with cases like yours and would make your code much simpler:
const {DataStream} = require('scramjet');
// Pass your original stream here, this could be also an
// AsyncGenerator in node v10 and up.
DataStream.from(measurementStream)
// you can map with any async or sync operation on the data
.map(async item => {
const newData = await doSomeTransforms();
return newData;
})
// tee creates a stream at this point in transforms
.tee(stream => stream
.toJSONArray()
.pipe(fs.createWriteStream('your-intermediate.json'))
)
// then you can add some more transforms
.map(someMapper)
.filter(someFilter)
.CSVStringify()
.pipe(fs.createWriteStream('your-final.csv');
If you'd choose the first path anyway, I'd recommend a couple modules that would make your life easier: JSONStream
and papaparse
both available in NPM.
Upvotes: 2