Reputation: 4675
Is there a way of doing bulk upsert in sequelize. Also, can I specify which keys to use for checking for duplicates?
I tried following but it didn't work:
Employee.bulkCreate(data, {
updateOnDuplicate: true
});
Bulk creation works fine though. Above statement always creates new entries in the DB.
Upvotes: 67
Views: 89190
Reputation: 1
Base on @François Dispaux answer, I have improved the bulkUpsert function.
This should work with Sequelize and Postgres.
Notes: I have got some lines of code from source of sequelize library.
// Version: 6.17.0
// yarn add [email protected]
//
const _ = require('lodash');
const { Sequelize, Model, Utils, QueryTypes, QueryError } = require('sequelize');
// --------------------------------------------------------------
// --------------------------------------------------------------
const __defProp = Object.defineProperty;
const __defProps = Object.defineProperties;
const __getOwnPropDescs = Object.getOwnPropertyDescriptors;
const __getOwnPropSymbols = Object.getOwnPropertySymbols;
const __hasOwnProp = Object.prototype.hasOwnProperty;
const __propIsEnum = Object.prototype.propertyIsEnumerable;
const __defNormalProp = (obj, key, value) => key in obj ? __defProp(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value;
const __spreadValues = (a, b) => {
for (let prop in b || (b = {}))
if (__hasOwnProp.call(b, prop))
__defNormalProp(a, prop, b[prop]);
if (__getOwnPropSymbols)
for (let prop of __getOwnPropSymbols(b)) {
if (__propIsEnum.call(b, prop))
__defNormalProp(a, prop, b[prop]);
}
return a;
};
const __spreadProps = (a, b) => __defProps(a, __getOwnPropDescs(b));
// --------------------------------------------------------------
// --------------------------------------------------------------
/**
*
* @param {Model} model Instance of Sequelize model
* @param {Object} options Similar to options of findAll function.
* @param {Boolean} removeSemicolon to remove the semicolon at the end of query. It is useful when using to build query for UNION ALL
* @returns {String} SQL SELECT query
*/
async function buildFindAllSQL(model, options, { removeSemicolon = false }) {
if (options !== void 0 && !_.isPlainObject(options)) {
throw new QueryError("The argument passed to findAll must be an options object, use findByPk if you wish to pass a single primary key value");
}
if (options !== void 0 && options.attributes) {
if (!Array.isArray(options.attributes) && !_.isPlainObject(options.attributes)) {
throw new QueryError("The attributes option must be an array of column names or an object");
}
}
model.warnOnInvalidOptions(options, Object.keys(model.rawAttributes));
const tableNames = {};
tableNames[model.getTableName(options)] = true;
options = Utils.cloneDeep(options);
_.defaults(options, { hooks: true });
options.rejectOnEmpty = Object.prototype.hasOwnProperty.call(options, "rejectOnEmpty") ? options.rejectOnEmpty : model.options.rejectOnEmpty;
model._injectScope(options);
if (options.hooks) {
await model.runHooks("beforeFind", options);
}
model._conformIncludes(options, model);
model._expandAttributes(options);
model._expandIncludeAll(options);
if (options.hooks) {
await model.runHooks("beforeFindAfterExpandIncludeAll", options);
}
options.originalAttributes = model._injectDependentVirtualAttributes(options.attributes);
if (options.include) {
options.hasJoin = true;
model._validateIncludedElements(options, tableNames);
if (options.attributes && !options.raw && model.primaryKeyAttribute && !options.attributes.includes(model.primaryKeyAttribute) && (!options.group || !options.hasSingleAssociation || options.hasMultiAssociation)) {
options.attributes = [model.primaryKeyAttribute].concat(options.attributes);
}
}
if (!options.attributes) {
options.attributes = Object.keys(model.rawAttributes);
options.originalAttributes = model._injectDependentVirtualAttributes(options.attributes);
}
model.options.whereCollection = options.where || null;
Utils.mapFinderOptions(options, model);
options = model._paranoidClause(model, options);
if (options.hooks) {
await model.runHooks("beforeFindAfterOptions", options);
}
const selectOptions = __spreadProps(__spreadValues({}, options), { tableNames: Object.keys(tableNames) });
// This function based-on the code from findAll function of the Model class.
// In the findAll function, the model.queryInterface.select function will be called.
// Inside the select function of the QueryInterface class will define the way to build SELECT query.
const sql = model.sequelize.queryInterface.queryGenerator.selectQuery(model.getTableName(selectOptions), { ...selectOptions, type: QueryTypes.SELECT, model }, model);
if (removeSemicolon) {
const lastChar = sql.slice(sql.length - 1);
if (lastChar === ';') {
return sql.slice(0, -1)
}
}
return sql;
}
/**
*
* @param {Array<Object>} items List data object need to be parsed / mapped.
* @param {Model} model Instance of Sequelize model
* @param {Array<String>} fields List of columns' name
* @returns {Array<Object>}
*/
function mapValues(items, { model, fields }) {
const records = _.cloneDeep(items);
//
const fieldMappedAttributes = {};
for (const attr in model.tableAttributes) {
fieldMappedAttributes[model.rawAttributes[attr].field || attr] = model.rawAttributes[attr];
}
//
const fieldValueHashes = records.map(values => {
const out = Utils.mapValueFieldNames(values, fields, model);
for (const key of model._virtualAttributes) {
delete out[key];
}
return out;
});
//
const tuples = []
for (const fieldValueHash of fieldValueHashes) {
const values = fields.map(key => {
return model.sequelize.queryInterface.queryGenerator.escape(fieldValueHash[key], fieldMappedAttributes[key], { context: 'INSERT' });
});
tuples.push(`(${values.join(',')})`);
}
//
return tuples;
}
/**
*
* @param {Array<Object>} items List data object need to be inserted / updated
* @param {Model} model Instance of Sequelize model
* @returns {String} SQL INSERT query
*/
async function buildBulkUpsertSQL(items = [], {
model,
conflictKeys = [],
excludeFromUpdate = [],
conflictWhere = [],
returning = false,
logging = false,
}) {
if (!items.length) {
return null;
}
const { tableName, sequelize } = model;
const sample = items[0];
const fields = Object.keys(sample);
const createFields = `("${fields.join(`","`)}")`;
const updateFields = fields
.filter((field) => ![...excludeFromUpdate, ...conflictKeys].includes(field))
.map((field) => `"${field}"=EXCLUDED.${field}`)
.join(', ');
//
const tuples = mapValues(_.cloneDeep(items), { model, fields });
const values = tuples.join(',');
//
const onConflict = `ON CONFLICT ("${conflictKeys.join(`","`)}")`;
const returningFields = `"${fields.join('","')}"`;
// const updateWhere = Object.keys(conflictWhere).length > 0 ? `WHERE ${Object.keys(conflictWhere).map(key => `"${tableName}"."${key}" ${conflictWhere[key]}`).join(',')}` : '';
const updateWhere = conflictWhere.length > 0 ? `WHERE ${conflictWhere.join(',')}` : '';
let query = `INSERT INTO "${tableName}" ${createFields} VALUES ${values}`;
if (conflictKeys.length > 0) {
query = `${query} ${onConflict} DO UPDATE SET ${updateFields} ${updateWhere}`;
}
if (returning === true) {
query = `${query} RETURNING ${returningFields}`;
}
query += ';';
if (typeof logging === 'function') {
logging('---------------------------------------');
logging(query);
logging('---------------------------------------');
}
return query;
}
/**
*
* @param {Array<Object>} items List data object need to be inserted / updated
* @param {Model} model Instance of Sequelize model
* @returns {Array} Result of sequelize.query function
*/
async function bulkUpsert(items = [], {
model,
conflictKeys = [],
excludeFromUpdate = [],
conflictWhere = [],
transaction = null,
logging = false
}) {
if (!items.length) {
return [0, 0];
}
const query = await buildBulkUpsertSQL(items, { model, conflictKeys, excludeFromUpdate, conflictWhere, logging });
if (!query) {
return [0, 0];
}
const { sequelize } = model;
const options = {
type: sequelize.QueryTypes.INSERT,
// logging,
};
if (transaction) {
options[transaction] = transaction;
}
return sequelize.query(query, options);
}
// --------------------------------------------------------------
module.exports = {
buildFindAllSQL,
buildBulkUpsertSQL,
bulkUpsert,
mapValues,
};
Upvotes: 0
Reputation: 1531
For someone using seqeulize
with sequlize-typescript
if updateOnDuplicate
is not working for an unique
column, try to move your unique
column up when declaring your model
:
Before:
@Table({ modelName: 'transaction' })
export class TransactionModel extends Model {
@Column({
type: DataType.INTEGER,
allowNull: false,
autoIncrement: true,
unique: true,
primaryKey: true,
})
override id: number;
@Column(DataType.DATE)
override createdAt: string;
@Column(DataType.STRING)
contractAddress: string;
@Column(DataType.INTEGER)
cumulativeGasUsed: number;
@Column(DataType.STRING)
from: string;
@Column(DataType.INTEGER)
gasUsed: number;
@Column(DataType.INTEGER)
effectiveGasPrice: number;
@Column(DataType.TEXT)
logsBloom: string;
@Column(DataType.BOOLEAN)
status: boolean;
@Column(DataType.STRING)
to: string;
@Unique
@Column(DataType.STRING)
hash: string; // <---------------------- Required Unique column
}
After:
@Table({ modelName: 'transaction' })
export class TransactionModel extends Model {
@Unique
@Column(DataType.STRING)
hash: string; // <---------------------- Required Unique column
@Column({
type: DataType.INTEGER,
allowNull: false,
autoIncrement: true,
unique: true,
primaryKey: true,
})
override id: number;
@Column(DataType.DATE)
override createdAt: string;
@Column(DataType.STRING)
contractAddress: string;
@Column(DataType.INTEGER)
cumulativeGasUsed: number;
@Column(DataType.STRING)
from: string;
@Column(DataType.INTEGER)
gasUsed: number;
@Column(DataType.INTEGER)
effectiveGasPrice: number;
@Column(DataType.TEXT)
logsBloom: string;
@Column(DataType.BOOLEAN)
status: boolean;
@Column(DataType.STRING)
to: string;
}
Upvotes: 0
Reputation: 53
For those using queryInterface.bulkInsert
, you will need to add an upsertKeys
array in the options.
Example:
Employee.bulkCreate(dataArray,
{
upsertKeys:["id"] ,
updateOnDuplicate: ["name"]
} )
Upvotes: 1
Reputation: 8571
Sequelize 6.x added support for all UPSERTs on all dialects, so @followtest52's answer is valid for PostgreSQL too.
Since PostgreSQL is not supported by the answer, the """"best"""" alternative using Sequelize is doing a manual query with the ON CONFLICT
statement. Example (Typescript):
const values: Array<Array<number | string>> = [
[1, 'Apple', 'Red', 'Yummy'],
[2, 'Kiwi', 'Green', 'Yuck'],
]
const query = 'INSERT INTO fruits (id, name, color, flavor) VALUES ' +
values.map(_ => { return '(?)' }).join(',') +
' ON CONFLICT (id) DO UPDATE SET flavor = excluded.flavor;'
sequelize.query({ query, values }, { type: sequelize.QueryTypes.INSERT })
This would build a query like:
INSERT INTO
fruits (id, name, color, flavor)
VALUES
(1, 'Apple', 'Red', 'Yummy'),
(2, 'Kiwi', 'Green', 'Yuck')
ON CONFLICT (id) DO UPDATE SET
flavor = excluded.flavor;
Suffice to say, this is not an ideal solution to have to manually build queries, since it defeats the purpose of using sequelize, but if it's one-off query that you don't desperately need, you could use this method.
Upvotes: 15
Reputation: 5791
A modified version. Which would do the job.
/**
*
* @param {*} data Raw JSON data
* @param {*} model Sequalize model
* @param {*} fields Columns thare need to be inserted/update.If none passed, it will extract fields from the data.
* @returns response consists of data with type of action(upsert/create) performed for each record.
*/
export const bulkUpert = (data, model, fields = undefined) => {
console.log("****Bulk insertion started****");
if (!data.length) {
return [0, 0];
}
const { name, primaryKeyAttributes } = model;
console.log(name, primaryKeyAttributes, fields);
if (!sequelize) {
throw new Error(`Sequalize not initialized on ${name}`);
}
const extractFields = fields ? fields : Object.keys(data[0]);
const createFields = extractFields.join(", ");
const values = data.map(dataToSql()).join(", ");
const query = `MERGE INTO
[${name}]
WITH(HOLDLOCK)
AS [targetTable]
USING (
VALUES ${values}
)
AS [sourceTable]
(
${createFields}
) ON
${getPrimaryQueryString(primaryKeyAttributes)}
WHEN MATCHED THEN
UPDATE SET
${getUpdateFieldsString(extractFields)}
WHEN NOT MATCHED THEN
INSERT (
${createFields}
)
VALUES
(
${getInsertValuesString(extractFields)}
)
OUTPUT $action, INSERTED.*;`;
return sequelize.query(query);
};
const valueToSQL = () => (value) => {
if (value === null) {
return "null";
}
if (typeof value === "boolean") {
return value ? "true" : "false";
}
if (typeof value !== "object" || value instanceof Date) {
return sequelize.escape(value);
}
return sequelize.escape(JSON.stringify(value));
};
const getPrimaryQueryString = (primaryKeyAttributes) => {
let string = "";
for (let i = 0; i < primaryKeyAttributes.length; i++) {
string += `[targetTable].[${primaryKeyAttributes[i]}] = [sourceTable].[${primaryKeyAttributes[i]}]`;
if (i != primaryKeyAttributes.length - 1) {
string += " AND";
}
}
return string;
};
const getUpdateFieldsString = (fields) => {
let string = "";
for (let i = 0; i < fields.length; i++) {
string += `[targetTable].[${fields[i]}] = [sourceTable].[${fields[i]}]`;
if (i != fields.length - 1) {
string += ", ";
}
}
return string;
};
const getInsertValuesString = (fields) => {
let string = "";
for (let i = 0; i < fields.length; i++) {
string += `[sourceTable].[${fields[i]}]`;
if (i != fields.length - 1) {
string += ", ";
}
}
return string;
};
const dataToSql = () => (data) =>
`(${Object.values(data).map(valueToSQL()).join(",")})`;
Upvotes: 0
Reputation: 198
2021 September update
Bulk upserting with unique compound indexes now just works in Sequelize v6.4.4.
https://github.com/sequelize/sequelize/pull/13345
Upvotes: 0
Reputation: 746
2020 November 2nd update
Based on @Yedhin answer, here is a more generic solution (typescript):
export const bulkUpsert = async <T extends Model<T>, K extends keyof T>(
items: Partial<T>[],
model: ModelCtor<T>,
conflictKeys: K[],
excludeFromUpdate: K[] = [],
): Promise<[number, number]> => {
if (!items.length) {
return [0, 0];
}
const { tableName, sequelize, name } = model;
if (!sequelize) {
throw new Error(`Sequelize not initialized on ${name}?`);
}
const sample = items[0];
const fields = Object.keys(sample) as K[];
const createFields = `("${fields.join(`","`)}")`;
const updateFields = fields
.filter((field) => ![...excludeFromUpdate, ...conflictKeys].includes(field))
.map((field) => `"${field}"=EXCLUDED."${field}"`)
.join(', ');
const values = items.map(dataToSql(sequelize)).join(',');
const onConflict = `ON CONFLICT ("${conflictKeys.join(`","`)}")`;
const returning = `"${fields.join('","')}"`;
const query = `INSERT INTO "${tableName}" ${createFields} VALUES ${values} ${onConflict} DO UPDATE SET ${updateFields} RETURNING ${returning};`;
return sequelize.query(query, {
replacements: items,
type: QueryTypes.INSERT,
});
};
const valueToSql = (sequelize: Sequelize) => (
value: string | number | boolean | null | Date | string[] | Record<string, unknown>,
): string => {
if (value === null) {
return 'null';
}
if (typeof value === 'boolean') {
return value ? 'true' : 'false';
}
if (typeof value !== 'object' || value instanceof Date) {
return sequelize.escape(value);
}
return sequelize.escape(JSON.stringify(value));
};
const dataToSql = <T extends Node<T>>(sequelize: Sequelize) => (data: Partial<T>): string =>
`(${Object.values(data).map(valueToSql(sequelize)).join(',')})`;
Upvotes: 2
Reputation: 3159
2020 October 1st Update
Sequelize Version: ^6.3.5
The issue still persists. We can't still bulkUpsert
with unique composite indexes. bulkCreate
with updateOnDuplicates
doesn't yet work with unique composite indexes. There are PR's still awaiting to be merged, which may fix this issue:-
https://github.com/sequelize/sequelize/pull/12516
https://github.com/sequelize/sequelize/pull/12547
For the time being, if anyone wants a quick workaround, then the following raw query based wrapper can be used by modifying with your own tables attributes, names and data:-
const bulkUpsertIntoTable = async ({ bulkUpsertableData }) => {
try {
/* eslint-disable */
// id column will automatically be incremented if you have set it to auto-increment
const query = `INSERT INTO "Table" ("non_id_attr1", "non_id_attr2", "non_id_attr3","createdAt", "updatedAt") VALUES ${bulkUpsertableData
.map((_) => "(?)")
.join(
","
)} ON CONFLICT ("non_id_attr1","non_id_attr2") DO UPDATE SET "non_id_attr1"=excluded."non_id_attr1", "non_id_attr2"=excluded."non_id_attr2", "non_id_attr3"=excluded."non_id_attr3", "updatedAt"=excluded."updatedAt" RETURNING "id","non_id_attr1","non_id_attr2","non_id_attr3","createdAt","updatedAt";`;
/* eslint-enable */
return await models.sequelize.query(query, {
replacements: bulkUpsertableData,//------> dont forget to pass your data here
type: models.Sequelize.QueryTypes.INSERT,
// transaction:t -----> if required to be done in transaction
});
} catch (error) {
console.error("Bulk Upserting into Table:", error);
throw error;
}
};
Important point is creating the bulkUpsertableData
, where it should be Array<Array> ie:- [[]]
. Example creation:-
// with reference to above wrapper function
const bulkUpsertableData = Object.keys(myObjectData).map(type => [
myObjectData[type],// -----> non_id_attr1
type, // -----> non_id_attr2
someOtherRandomValue, // -----> non_id_attr3
new Date(), // -----> created_at
new Date(), // -----> updated_at
]);
// response will have all the raw attributes mentioned in RETURNING clause
const upsertedTableResponse = await bulkUpsertIntoTable({ bulkUpsertableData });
Upvotes: 6
Reputation: 6194
2019 Update
Works for all dialects provided a certain minimum version is matched
HERE is the reference to the source code for the same
Note that individual options may or may not work across all dialects For example, updateOnDuplicate will work only on MySQL, MariaDB, SQLite and Postgres
ignoreDuplicates option will NOT work on MSSQL
Also check this BLOCK of code in the source
if (Array.isArray(options.updateOnDuplicate) && options.updateOnDuplicate.length) {
options.updateOnDuplicate = _.intersection(
_.without(Object.keys(model.tableAttributes), createdAtAttr),
options.updateOnDuplicate
);
} else {
return Promise.reject(new Error('updateOnDuplicate option only supports non-empty array.'));
}
updateOnDuplicate has to be an Array, cannot be true or false
So going with the above points, your code should be something like this
Employee.bulkCreate(data, {
updateOnDuplicate: ['employeeName', 'employeeAge'],
});
UPDATE:
Since someone mentioned it is not working, try this
models.Employee.bulkCreate(items, {
returning: ['employeeId'],
ignoreDuplicates: true
})
Upvotes: 7
Reputation: 1176
From the official sequelizejs reference.
It can be done using bulkCreate
with the updateOnDuplicate
option.
Like this for example :
Employee.bulkCreate(dataArray,
{
fields:["id", "name", "address"] ,
updateOnDuplicate: ["name"]
} )
updateOnDuplicate
is an array of fields that will be updated when the primary key (or may be unique key) match the row. Make sure you have at least one unique field (let say id) in your model and in the dataArray
both for upsert.
Upvotes: 116