Julian Wilson
Julian Wilson

Reputation: 2470

Data not being transformed Node.js Transform streams

I'm trying to make a transform stream flow that is taking data from socket.io, converting it to JSON, and then sending it to stdout. I am totally perplexed as to why data just seems to go right through without any transformation. I'm using the through2 library. Here is my code:

getStreamNames().then(streamNames => {
        const socket = io(SOCKETIO_URL);
        socket.on('connect', () => {
            socket.emit('Subscribe', {subs: streamNames});
        });

        const stream = through2.obj(function (chunk, enc, callback) {
            callback(null, parseString(chunk))
        }).pipe(through2.obj(function (chunk, enc, callback) {
            callback(null, JSON.stringify(chunk));
        })).pipe(process.stdout);

        socket.on('m', data => stream.write(data));

    },
);

getStreamNames returns a promise which resolves to an array of stream names (i'm calling an external socket.io API) and parseString takes a string returned from the API and converts it to JSON so it's manageable.

What I'm looking for is my console to print out the stringify'd JSON after I parse it using parseString and then make it stdout-able with JSON.stringify. What is actually happening is the data is going right through the stream and doing no transformation.

For reference, the data coming from the API is in a weird format, something like

field1~field2~0x23~fieldn

and so that's why I need the parseString method.

I must be missing something. Any ideas?

EDIT:

parseString:

function(value) {
    var valuesArray = value.split("~");
    var valuesArrayLenght = valuesArray.length;
    var mask = valuesArray[valuesArrayLenght - 1];
    var maskInt = parseInt(mask, 16);
    var unpackedCurrent = {};
    var currentField = 0;
    for (var property in this.FIELDS) {
        if (this.FIELDS[property] === 0) {
            unpackedCurrent[property] = valuesArray[currentField];
            currentField++;
        }
        else if (maskInt & this.FIELDS[property]) {
            if (property === 'LASTMARKET') {
                unpackedCurrent[property] = valuesArray[currentField];
            }
            else {
                unpackedCurrent[property] = parseFloat(valuesArray[currentField]);
            }
            currentField++;
        }
    }

    return unpackedCurrent;
};

Thanks

Upvotes: 2

Views: 825

Answers (1)

Marcos Casagrande
Marcos Casagrande

Reputation: 40394

The issue is that the stream you're writing, is actually process.stdout, because .pipe returns the last stream.Writable, so you can keep chaining, in your case, process.stdout.

const x = stream.pipe(stream2).pipe(stream3).pipe(process.stdout);
x === process.stdout // true

So all you were doing was: process.stdout.write(data) without going through the pipeline.

What you need to do, is assign your first through2 stream to the stream variable, and then .pipe on that stream.

const stream = through2.obj((chunk, enc, callback) => {
    callback(null, parseString(chunk))
});

stream
    .pipe(through2.obj((chunk, enc, callback) => {
        callback(null, JSON.stringify(chunk));
    }))
    .pipe(process.stdout);

socket.on('m', data => stream.write(data));

Upvotes: 3

Related Questions