ahmedkhabkhab
ahmedkhabkhab

Reputation: 43

Using transform/duplex stream in NodeJS

I am gathering data from an external source (Bluetooth Low Energy) using NodeJS (noble module). I am streaming them to a JSON array in an external file. Let's call it fromSource.json. It looks like this:

fromSource.json

[
     {
         "id": 1,
         "name": "foo"
         "value": 123
     },
     {
         "id": 2,
         "name": "foo2",
         "value": 123
     },
     {
         "id": 3,
         "name": "foo3",
         "value": 789
     }
]

In the other hand, I am going to process these objects in real-time and store the new values in a CSV file. Let's call it toDestination.csv. It looks like this:

toDestination.csv

id,name,convertedValue
1,"foo",123000
2,"foo2",456000
3,"foo3",789000

Every one second, I am going to receive a new value (a new json object) from the source, push it into a writeable stream (the json array file) then read it into a readable stream, do a transformation, and write it again into its final destination, the csv file.

My questions are: Are NodeJS streams adapted to handle JSON objects ? Should I stringify them before using them ? Should I use a Duplex or a Transform stream in my case ?

Upvotes: 1

Views: 1360

Answers (1)

Michał Karpacki
Michał Karpacki

Reputation: 2658

Based on your question I do think Transform is all you need. In Duplex you'd need to implement both reading and writing which is not necessary in your case.

The code would look like this:

const intermediate = measurementStream.pipe(
    new Transform({transform: initialTransforms})
);
intermediate.pipe(someArrayifyStreamToJSON)
intermediate.pipe(
    new Transform({transform: someMoreTransforms})
).pipe(
    new Transform({transform: stringifyToCSV})
).pipe(
    fs.createWriteStream('some_path.csv')
);

I would also recommend to take a look at the framework I've created and support scramjet. It is meant to deal with cases like yours and would make your code much simpler:

const {DataStream} = require('scramjet');

// Pass your original stream here, this could be also an 
// AsyncGenerator in node v10 and up.
DataStream.from(measurementStream)
     // you can map with any async or sync operation on the data
    .map(async item => {
         const newData = await doSomeTransforms();
         return newData;
    })
    // tee creates a stream at this point in transforms
    .tee(stream => stream
        .toJSONArray()
        .pipe(fs.createWriteStream('your-intermediate.json'))
    )
    // then you can add some more transforms
    .map(someMapper)
    .filter(someFilter)
    .CSVStringify()
    .pipe(fs.createWriteStream('your-final.csv');

If you'd choose the first path anyway, I'd recommend a couple modules that would make your life easier: JSONStream and papaparse both available in NPM.

Upvotes: 2

Related Questions