Trang D
Trang D

Reputation: 327

How to insert bulk data to postgresql db from CSV file?

I have to insert more than 100 records which are present in CSV file to PostgreSQL db. So I have tried the below mentioned code, it is reading the data from the file but unable to insert them to PostgreSQL table so is there any other way to perform this? Like csvtojson etc.?

const csv = require('csv');
var csvParser = require('csv-parse');

Controller.uploadCsv = async(data) => {
    fs.createReadStream(data.path)
        .pipe(csvParser({
            delimiter: '\t', 
            endLine: '\n', 
            escapeChar: '"', 
            enclosedChar: '"'
        }))
        .on('data', function(data) {
             console.log(data)// returning in console mentioned below
             console.log(data.name) // is undefined 

             const add = {
                name: data.name,
                address: data.address,
                phoneNo: data.phoneNumber,
                email: data.email,
                created_at: new Date(),
                updated_at: new Date()
            };
            const result = await models.table.create(add);
        })
        .on('end', function(data) {
             console.log('reading finished')
        })
}

router.js

router.post('/file', upload.single('file'),(req, res, next) => {
    Controller.uploadCsv(req.file)
        .then((result) => res.json(result))
        .catch(next)
})

console data

    [ 'name',
      'address'
      'phoneNumber',
      'email',
      'created_at',
      'updated_at']
    [ 'aaa',
      'delhi',
      '1102558888',
      '[email protected]',
      '2017-10-08T06:17:09.922Z',
      '2018-10-08T06:17:09.922Z',]
    [ 'Oreo',
      'bgl',
      '1112589633',
      '[email protected]',
      '2017-10-08T06:17:09.922Z',
      '2018-10-08T06:17:09.922Z' ]

Upvotes: 1

Views: 4612

Answers (2)

Lucas Ponzo
Lucas Ponzo

Reputation: 104

Insert the async keyword on the OnData function. Remember, it's not sequencial execution, so the records may be inserted on a completely diferent order between one program execution and another.

Replace:

.on('data', function(data) {

With:

.on('data', async function(data) {

Upvotes: 1

Michał Karpacki
Michał Karpacki

Reputation: 2658

TL;DR. Your code has a minor error that may be causing your problem - it's when you use await, in order to run it you'd need to put async before the function on the data handler - it may work for small files, but please read on it's not the right solution - I added one of the proper ways below.

ES6 async/await is a language construct that allows you to await for resolution of a Promise and continue executing the code in an async function. In your code you do have an async function declaration, however you added await in a non-async function. To clarify - await keyword will only be allowed when the closest function() { is async - in your case it's not.

I actually don't think your code would even compile and after some changes you'd fall straight to a problem mentioned in this question - this is because you're trying to run an asynchronous operation on a synchronous event handler in node. This asynchronous insert to the database will get run, but the end event will fire before the operations are completed.

In order to do this correctly - you could use a transform stream or abandon streaming altogether and simply use an array from CSV (there's more than enough good modules for that). I am however the author of the scramjet framework and I also think this should simply work as you wrote it, or maybe even simpler.

Here's a code that will do what you want:

const {StringStream} = require('scramjet');

Controller.uploadCsv = async(data) => 
    fs.createReadStream(data.path)
        .pipe(new StringStream('utf-8'))
        .CSVParse({
            delimiter: '\t', 
            newline: '\n', 
            escapeChar: '"', 
            quoteChar: '"'
        })
        .map(data => ({
            name: data.name,
            address: data.address,
            phoneNo: data.phoneNumber,
            email: data.email,
            created_at: new Date(),
            updated_at: new Date()
        }))
        .each(async entry => await models.table.create(entry))
        .each(result => log(result)) // if it's worth logging
        .run();

Scramjet simply uses streams (all classes extend built-in node.js streams) underneath, but exposes an interface similar to synchronous ones on Array etc. You can run your async operations and it returns a Promise from run operation.

Upvotes: 1

Related Questions