Reputation: 6202
Here is my existing code
const QueryStream = require('pg-query-stream')
const JSONStream = require('JSONStream')
function prepareText(title, content, summary) {
let description
if (content && content.length) {
description = content
} else if (summary && summary.length) {
description = summary
} else {
description = ''
}
return title.toLowerCase() + ' ' + description.toLowerCase()
}
async function tagAll({ db, logger, tagger }) {
// you can also use pgp.as.format(query, values, options)
// to format queries properly, via pg-promise;
const qs = new QueryStream(
'SELECT feed_item_id,title,summary,content FROM feed_items ORDER BY pubdate DESC, feed_item_id DESC'
)
try {
const result = await db.stream(qs, (s) => {
// initiate streaming into the console:
s.pipe(JSONStream.stringify())
s.on('data', async (item) => {
try {
s.pause()
// eslint-disable-next-line camelcase
const { feed_item_id, title, summary, content } = item
// Process text to be tagged
const text = prepareText(title, summary, content)
const tags = tagger.tag(text)
// Update tags per post
await db.query(
'UPDATE feed_items SET tags=$1 WHERE feed_item_id=$2',
// eslint-disable-next-line camelcase
[tags, feed_item_id]
)
} catch (error) {
logger.error(error)
} finally {
s.resume()
}
})
})
logger.info(
'Total rows processed:',
result.processed,
'Duration in milliseconds:',
result.duration
)
} catch (error) {
logger.error(error)
}
}
module.exports = tagAll
Upvotes: 1
Views: 2287
Reputation: 6202
This is the best I could come up with to batch the queries inside the stream so that we dont need to load all data in memory or run too many queries. If anyone knows a better way to batch especially with t.sequence feel free to add another answer
const BATCH_SIZE = 5000
async function batchInsert({ db, pgp, logger, data }) {
try {
// https://vitaly-t.github.io/pg-promise/helpers.ColumnSet.html
const cs = new pgp.helpers.ColumnSet(
[
{ name: 'feed_item_id', cast: 'uuid' },
{ name: 'tags', cast: 'varchar(64)[]' },
],
{
table: 'feed_items',
}
)
const query =
pgp.helpers.update(data, cs) + ' WHERE v.feed_item_id=t.feed_item_id'
await db.none(query)
} catch (error) {
logger.error(error)
}
}
async function tagAll({ db, pgp, logger, tagger }) {
// you can also use pgp.as.format(query, values, options)
// to format queries properly, via pg-promise;
const qs = new QueryStream(
'SELECT feed_item_id,title,summary,content FROM feed_items ORDER BY pubdate DESC, feed_item_id DESC'
)
try {
const queryValues = []
const result = await db.stream(qs, (s) => {
// initiate streaming into the console:
s.pipe(JSONStream.stringify())
s.on('data', async (item) => {
try {
s.pause()
// eslint-disable-next-line camelcase
const { feed_item_id, title, summary, content } = item
// Process text to be tagged
const text = prepareText(title, summary, content)
const tags = tagger.tag(text)
queryValues.push({ feed_item_id, tags })
if (queryValues.length >= BATCH_SIZE) {
const data = queryValues.splice(0, queryValues.length)
await batchInsert({ db, pgp, logger, data })
}
} catch (error) {
logger.error(error)
} finally {
s.resume()
}
})
})
await batchInsert({ db, pgp, logger, data: queryValues })
return result
} catch (error) {
logger.error(error)
}
}
Upvotes: 0
Reputation: 962
If you can do everything with one sql statement, you should! Here you're paying the price of a back and forth between node and your DB for each line of your table, which will take most of the time of your query.
Your request can be implemented in pure sql:
update feed_items set tags=case
when (content = '') is false then lower(title) || ' ' || lower(content)
when (summary = '') is false then lower(title) || ' ' || lower(summary)
else title end;
This request will update all your table at once. I'm sure it'd be some order of magnitude faster than your method. On my machine, with a table containing 100000 rows, the update time is about 600ms.
Some remarks:
limit
part was because it is too slow? If it is the case, then you can drop it, 50000 rows is not a big table for postgres.pg-stream
things does not really stream stuff out of the DB, it only allows you to use a stream-like api from the results it gathered earlier... No problem about that, but I thought maybe there was a misconception here.Upvotes: 1