PirateApp
PirateApp

Reputation: 6202

How do I run an update while streaming using pg-query-stream and pg-promise?

Here is my existing code

const QueryStream = require('pg-query-stream')
const JSONStream = require('JSONStream')

function prepareText(title, content, summary) {
  let description
  if (content && content.length) {
    description = content
  } else if (summary && summary.length) {
    description = summary
  } else {
    description = ''
  }
  return title.toLowerCase() + ' ' + description.toLowerCase()
}

async function tagAll({ db, logger, tagger }) {
  // you can also use pgp.as.format(query, values, options)
  // to format queries properly, via pg-promise;
  const qs = new QueryStream(
    'SELECT feed_item_id,title,summary,content FROM feed_items ORDER BY pubdate DESC, feed_item_id DESC'
  )
  try {
    const result = await db.stream(qs, (s) => {
      // initiate streaming into the console:
      s.pipe(JSONStream.stringify())
      s.on('data', async (item) => {
        try {
          s.pause()
          // eslint-disable-next-line camelcase
          const { feed_item_id, title, summary, content } = item

          // Process text to be tagged
          const text = prepareText(title, summary, content)
          const tags = tagger.tag(text)

          // Update tags per post
          await db.query(
            'UPDATE feed_items SET tags=$1 WHERE feed_item_id=$2',
            // eslint-disable-next-line camelcase
            [tags, feed_item_id]
          )
        } catch (error) {
          logger.error(error)
        } finally {
          s.resume()
        }
      })
    })
    logger.info(
      'Total rows processed:',
      result.processed,
      'Duration in milliseconds:',
      result.duration
    )
  } catch (error) {
    logger.error(error)
  }
}

module.exports = tagAll

Upvotes: 1

Views: 2287

Answers (2)

PirateApp
PirateApp

Reputation: 6202

This is the best I could come up with to batch the queries inside the stream so that we dont need to load all data in memory or run too many queries. If anyone knows a better way to batch especially with t.sequence feel free to add another answer

const BATCH_SIZE = 5000
async function batchInsert({ db, pgp, logger, data }) {
  try {
    // https://vitaly-t.github.io/pg-promise/helpers.ColumnSet.html
    const cs = new pgp.helpers.ColumnSet(
      [
        { name: 'feed_item_id', cast: 'uuid' },
        { name: 'tags', cast: 'varchar(64)[]' },
      ],
      {
        table: 'feed_items',
      }
    )
    const query =
      pgp.helpers.update(data, cs) + ' WHERE v.feed_item_id=t.feed_item_id'
    await db.none(query)
  } catch (error) {
    logger.error(error)
  }
}

async function tagAll({ db, pgp, logger, tagger }) {
  // you can also use pgp.as.format(query, values, options)
  // to format queries properly, via pg-promise;
  const qs = new QueryStream(
    'SELECT feed_item_id,title,summary,content FROM feed_items ORDER BY pubdate DESC, feed_item_id DESC'
  )
  try {
    const queryValues = []
    const result = await db.stream(qs, (s) => {
      // initiate streaming into the console:
      s.pipe(JSONStream.stringify())
      s.on('data', async (item) => {
        try {
          s.pause()
          // eslint-disable-next-line camelcase
          const { feed_item_id, title, summary, content } = item

          // Process text to be tagged
          const text = prepareText(title, summary, content)
          const tags = tagger.tag(text)
          queryValues.push({ feed_item_id, tags })

          if (queryValues.length >= BATCH_SIZE) {
            const data = queryValues.splice(0, queryValues.length)
            await batchInsert({ db, pgp, logger, data })
          }
        } catch (error) {
          logger.error(error)
        } finally {
          s.resume()
        }
      })
    })
    await batchInsert({ db, pgp, logger, data: queryValues })
    return result
  } catch (error) {
    logger.error(error)
  }
}

Upvotes: 0

autra
autra

Reputation: 962

If you can do everything with one sql statement, you should! Here you're paying the price of a back and forth between node and your DB for each line of your table, which will take most of the time of your query.

Your request can be implemented in pure sql:

update feed_items set tags=case 
    when (content = '') is false then lower(title) || ' ' || lower(content) 
    when (summary = '') is  false then lower(title) || ' ' || lower(summary) 
    else title end;

This request will update all your table at once. I'm sure it'd be some order of magnitude faster than your method. On my machine, with a table containing 100000 rows, the update time is about 600ms.

Some remarks:

  • you don't need to order to update. As ordering is quite slow, it's better not to.
  • I guess the limit part was because it is too slow? If it is the case, then you can drop it, 50000 rows is not a big table for postgres.
  • I bet this pg-stream things does not really stream stuff out of the DB, it only allows you to use a stream-like api from the results it gathered earlier... No problem about that, but I thought maybe there was a misconception here.

Upvotes: 1

Related Questions