Miguel Coder
Miguel Coder

Reputation: 1950

How to convert a large stream to a gzipped base64 string

I'm building an analytics platform and I want to compress my ETL(Extract Transform Load) jobs before I store them in my database. Before I start writing the code, I was wondering if someone with some experience could tell me how to do it properly. I want to gzip the data, and then convert it to a base64 string. Do I simply gzip, and convert to base64 or will that not work?

This is the process I'm currently using for these large datasets.

var streamObj = athenaClient.execute('my query').toStream()
var data = [];

redis.set('Some Dashboard Data', '[')

streamObj.on('data', function(record) {
    // TODO gzip record then convert to base64
    if (data.length === 500) {
        let tempData = JSON.stringify(data);
        data = []
        redis.append('Some Dashboard Data', tempData.slice(1, tempData.length - 1) + ',')
        }
        data.push(record);
    })
}

If this is not possible, is there a way to store the gzipped string instead?

Upvotes: 1

Views: 1416

Answers (2)

Miguel Coder
Miguel Coder

Reputation: 1950

Just to further elaborate on Zilvinas answer. I will show you all how I got it to work.

const athena = require('./athena')
const redis = require('./redis')
const zlib = require('zlib')
const Stream = require('stream')

exports.persistStream = (config, query, name, transform) => {
return new Promise((resolve, reject) => {
    let recordCount = 0

    var transformStream = new Stream.Transform({ writableObjectMode: true, readableObjectMode: true})
    transformStream._transform = function (chunk, encoding, done) {

        recordCount++

        if (transform) chunk = transform(chunk)

        let jsonChunk = JSON.stringify([chunk])

        switch (true) {
            case recordCount === 1: 
                jsonChunk = jsonChunk.slice(0, jsonChunk.length - 1); break
            default:
                jsonChunk = ',' + jsonChunk.slice(1, jsonChunk.length - 1); break
        }
        this.push(jsonChunk)
        done();
    };

    transformStream._final = function (done) {
        this.push(']')
        done()
    }

    const gzip = zlib.createGzip()

    let buffers = []

    var stream = athena.execute(query)
        .toStream()
        .pipe(transformStream)
        .pipe(gzip)

    gzip.on('data', (chunk) => {
        buffers.push(chunk)
    })

    gzip.on('end', function () {
        let buffer = Buffer.concat(buffers)
        redis.set(name, buffer.toString('base64'), (err, response) => {
            zlib.gzip(config, (err, buff) => {
                redis.set(name + ' Config', buff.toString('base64'), (err, response) => {
                    if (err) {
                        console.log(err)
                        reject()
                    } else {

                        console.log(name + ' succeeded')
                        resolve()
                    }
                })
            })
        })
    })

    stream.on('error', (err) => {
        console.log(err)
        reject()
    })
})
}

Upvotes: 1

Žilvinas Jocius
Žilvinas Jocius

Reputation: 161

Let node.js environment control memory by using backpressure provided by streams.

I would consider this solution:

inputStream
    .pipe(zlib)
    .pipe(transformToBase64Stream)
    .pipe(redisCli);

zlib is native so that should not cause any problems. To convert to base64 you can write transform stream or use external tools. To pipe results into redis by stream, you could spawn child process redis-cli in pipe mode. As mentioned in mass insertion and redis cli articles it is suggested for big data but you got to handle redis protocol yourself. Read provided articles and let me know if it helped your problem to solve.

Upvotes: 1

Related Questions