Ashutosh
Ashutosh

Reputation: 4675

Nodejs sequelize bulk upsert

Is there a way of doing bulk upsert in sequelize. Also, can I specify which keys to use for checking for duplicates?

I tried following but it didn't work:

Employee.bulkCreate(data, {
    updateOnDuplicate: true
});

Bulk creation works fine though. Above statement always creates new entries in the DB.

Upvotes: 67

Views: 89190

Answers (10)

Ngoc Dam
Ngoc Dam

Reputation: 1

Base on @François Dispaux answer, I have improved the bulkUpsert function.

This should work with Sequelize and Postgres.

Notes: I have got some lines of code from source of sequelize library.

// Version: 6.17.0
// yarn add [email protected]
//

const _ = require('lodash');
const { Sequelize, Model, Utils, QueryTypes, QueryError } = require('sequelize');

// --------------------------------------------------------------
// --------------------------------------------------------------
const __defProp = Object.defineProperty;
const __defProps = Object.defineProperties;
const __getOwnPropDescs = Object.getOwnPropertyDescriptors;
const __getOwnPropSymbols = Object.getOwnPropertySymbols;
const __hasOwnProp = Object.prototype.hasOwnProperty;
const __propIsEnum = Object.prototype.propertyIsEnumerable;
const __defNormalProp = (obj, key, value) => key in obj ? __defProp(obj, key, { enumerable: true, configurable: true, writable: true, value }) : obj[key] = value;
const __spreadValues = (a, b) => {
  for (let prop in b || (b = {}))
    if (__hasOwnProp.call(b, prop))
      __defNormalProp(a, prop, b[prop]);
  if (__getOwnPropSymbols)
    for (let prop of __getOwnPropSymbols(b)) {
      if (__propIsEnum.call(b, prop))
        __defNormalProp(a, prop, b[prop]);
    }
  return a;
};
const __spreadProps = (a, b) => __defProps(a, __getOwnPropDescs(b));
// --------------------------------------------------------------
// --------------------------------------------------------------

/**
 * 
 * @param {Model} model Instance of Sequelize model
 * @param {Object} options Similar to options of findAll function.
 * @param {Boolean} removeSemicolon to remove the semicolon at the end of query. It is useful when using to build query for UNION ALL
 * @returns {String} SQL SELECT query
 */
async function buildFindAllSQL(model, options, { removeSemicolon = false }) {
  if (options !== void 0 && !_.isPlainObject(options)) {
    throw new QueryError("The argument passed to findAll must be an options object, use findByPk if you wish to pass a single primary key value");
  }
  if (options !== void 0 && options.attributes) {
    if (!Array.isArray(options.attributes) && !_.isPlainObject(options.attributes)) {
      throw new QueryError("The attributes option must be an array of column names or an object");
    }
  }
  model.warnOnInvalidOptions(options, Object.keys(model.rawAttributes));
  const tableNames = {};
  tableNames[model.getTableName(options)] = true;
  options = Utils.cloneDeep(options);
  _.defaults(options, { hooks: true });
  options.rejectOnEmpty = Object.prototype.hasOwnProperty.call(options, "rejectOnEmpty") ? options.rejectOnEmpty : model.options.rejectOnEmpty;
  model._injectScope(options);
  if (options.hooks) {
    await model.runHooks("beforeFind", options);
  }
  model._conformIncludes(options, model);
  model._expandAttributes(options);
  model._expandIncludeAll(options);
  if (options.hooks) {
    await model.runHooks("beforeFindAfterExpandIncludeAll", options);
  }
  options.originalAttributes = model._injectDependentVirtualAttributes(options.attributes);
  if (options.include) {
    options.hasJoin = true;
    model._validateIncludedElements(options, tableNames);
    if (options.attributes && !options.raw && model.primaryKeyAttribute && !options.attributes.includes(model.primaryKeyAttribute) && (!options.group || !options.hasSingleAssociation || options.hasMultiAssociation)) {
      options.attributes = [model.primaryKeyAttribute].concat(options.attributes);
    }
  }
  if (!options.attributes) {
    options.attributes = Object.keys(model.rawAttributes);
    options.originalAttributes = model._injectDependentVirtualAttributes(options.attributes);
  }
  model.options.whereCollection = options.where || null;
  Utils.mapFinderOptions(options, model);
  options = model._paranoidClause(model, options);
  if (options.hooks) {
    await model.runHooks("beforeFindAfterOptions", options);
  }
  const selectOptions = __spreadProps(__spreadValues({}, options), { tableNames: Object.keys(tableNames) });

  // This function based-on the code from findAll function of the Model class.
  // In the findAll function, the model.queryInterface.select function will be called.
  // Inside the select function of the QueryInterface class will define the way to build SELECT query.
  const sql = model.sequelize.queryInterface.queryGenerator.selectQuery(model.getTableName(selectOptions), { ...selectOptions, type: QueryTypes.SELECT, model }, model);

  if (removeSemicolon) {
    const lastChar = sql.slice(sql.length - 1);
    if (lastChar === ';') {
      return sql.slice(0, -1)
    }
  }

  return sql;
}

/**
 * 
 * @param {Array<Object>} items List data object need to be parsed / mapped.
 * @param {Model} model Instance of Sequelize model
 * @param {Array<String>} fields List of columns' name
 * @returns {Array<Object>}
 */
function mapValues(items, { model, fields }) {
  const records = _.cloneDeep(items);
  //
  const fieldMappedAttributes = {};
  for (const attr in model.tableAttributes) {
    fieldMappedAttributes[model.rawAttributes[attr].field || attr] = model.rawAttributes[attr];
  }
  //
  const fieldValueHashes = records.map(values => {
    const out = Utils.mapValueFieldNames(values, fields, model);
    for (const key of model._virtualAttributes) {
      delete out[key];
    }
    return out;
  });
  //
  const tuples = []
  for (const fieldValueHash of fieldValueHashes) {
    const values = fields.map(key => {
      return model.sequelize.queryInterface.queryGenerator.escape(fieldValueHash[key], fieldMappedAttributes[key], { context: 'INSERT' });
    });
    tuples.push(`(${values.join(',')})`);
  }
  //
  return tuples;
}

/**
 * 
 * @param {Array<Object>} items List data object need to be inserted / updated
 * @param {Model} model Instance of Sequelize model
 * @returns {String} SQL INSERT query
 */
async function buildBulkUpsertSQL(items = [], {
  model,
  conflictKeys = [],
  excludeFromUpdate = [],
  conflictWhere = [],
  returning = false,
  logging = false,
}) {

  if (!items.length) {
    return null;
  }

  const { tableName, sequelize } = model;

  const sample = items[0];
  const fields = Object.keys(sample);
  const createFields = `("${fields.join(`","`)}")`;
  const updateFields = fields
    .filter((field) => ![...excludeFromUpdate, ...conflictKeys].includes(field))
    .map((field) => `"${field}"=EXCLUDED.${field}`)
    .join(', ');
  //
  const tuples = mapValues(_.cloneDeep(items), { model, fields });
  const values = tuples.join(',');
  //
  const onConflict = `ON CONFLICT ("${conflictKeys.join(`","`)}")`;
  const returningFields = `"${fields.join('","')}"`;

  // const updateWhere = Object.keys(conflictWhere).length > 0 ? `WHERE ${Object.keys(conflictWhere).map(key => `"${tableName}"."${key}" ${conflictWhere[key]}`).join(',')}` : '';

  const updateWhere = conflictWhere.length > 0 ? `WHERE ${conflictWhere.join(',')}` : '';

  let query = `INSERT INTO "${tableName}" ${createFields} VALUES ${values}`;

  if (conflictKeys.length > 0) {
    query = `${query} ${onConflict} DO UPDATE SET ${updateFields} ${updateWhere}`;
  }

  if (returning === true) {
    query = `${query} RETURNING ${returningFields}`;
  }

  query += ';';

  if (typeof logging === 'function') {
    logging('---------------------------------------');
    logging(query);
    logging('---------------------------------------');
  }

  return query;
}

/**
 * 
 * @param {Array<Object>} items List data object need to be inserted / updated
 * @param {Model} model Instance of Sequelize model
 * @returns {Array} Result of sequelize.query function
 */
async function bulkUpsert(items = [], {
  model,
  conflictKeys = [],
  excludeFromUpdate = [],
  conflictWhere = [],
  transaction = null,
  logging = false
}) {

  if (!items.length) {
    return [0, 0];
  }

  const query = await buildBulkUpsertSQL(items, { model, conflictKeys, excludeFromUpdate, conflictWhere, logging });

  if (!query) {
    return [0, 0];
  }

  const { sequelize } = model;

  const options = {
    type: sequelize.QueryTypes.INSERT,
    // logging,
  };

  if (transaction) {
    options[transaction] = transaction;
  }

  return sequelize.query(query, options);
}

// --------------------------------------------------------------

module.exports = {
  buildFindAllSQL,
  buildBulkUpsertSQL,
  bulkUpsert,
  mapValues,
};

Upvotes: 0

Rahmat Ali
Rahmat Ali

Reputation: 1531

For someone using seqeulize with sequlize-typescript if updateOnDuplicate is not working for an unique column, try to move your unique column up when declaring your model:

Before:


@Table({ modelName: 'transaction' })
export class TransactionModel extends Model {


  @Column({
    type: DataType.INTEGER,
    allowNull: false,
    autoIncrement: true,
    unique: true,
    primaryKey: true,
  })
  override id: number;

  @Column(DataType.DATE)
  override createdAt: string;

  @Column(DataType.STRING)
  contractAddress: string;

  @Column(DataType.INTEGER)
  cumulativeGasUsed: number;

  @Column(DataType.STRING)
  from: string;

  @Column(DataType.INTEGER)
  gasUsed: number;

  @Column(DataType.INTEGER)
  effectiveGasPrice: number;


  @Column(DataType.TEXT)
  logsBloom: string;

  @Column(DataType.BOOLEAN)
  status: boolean;

  @Column(DataType.STRING)
  to: string;

  @Unique
  @Column(DataType.STRING)
  hash: string;           // <---------------------- Required Unique column
}

After:

@Table({ modelName: 'transaction' })
export class TransactionModel extends Model {

  @Unique
  @Column(DataType.STRING)
  hash: string;           // <---------------------- Required Unique column

  @Column({
    type: DataType.INTEGER,
    allowNull: false,
    autoIncrement: true,
    unique: true,
    primaryKey: true,
  })
  override id: number;

  @Column(DataType.DATE)
  override createdAt: string;

  @Column(DataType.STRING)
  contractAddress: string;

  @Column(DataType.INTEGER)
  cumulativeGasUsed: number;

  @Column(DataType.STRING)
  from: string;

  @Column(DataType.INTEGER)
  gasUsed: number;

  @Column(DataType.INTEGER)
  effectiveGasPrice: number;


  @Column(DataType.TEXT)
  logsBloom: string;

  @Column(DataType.BOOLEAN)
  status: boolean;

  @Column(DataType.STRING)
  to: string;
}

Upvotes: 0

Olawale Isaac
Olawale Isaac

Reputation: 53

For those using queryInterface.bulkInsert, you will need to add an upsertKeys array in the options. Example:

Employee.bulkCreate(dataArray, 
    {
        upsertKeys:["id"] ,
        updateOnDuplicate: ["name"] 
    } )

Upvotes: 1

Can
Can

Reputation: 8571

Update (Sequelize >= 6)

Sequelize 6.x added support for all UPSERTs on all dialects, so @followtest52's answer is valid for PostgreSQL too.

Original (Sequelize < 6)

Since PostgreSQL is not supported by the answer, the """"best"""" alternative using Sequelize is doing a manual query with the ON CONFLICT statement. Example (Typescript):

const values: Array<Array<number | string>> = [
    [1, 'Apple', 'Red', 'Yummy'],
    [2, 'Kiwi', 'Green', 'Yuck'],
]

const query = 'INSERT INTO fruits (id, name, color, flavor) VALUES ' +
     values.map(_ => { return '(?)' }).join(',') +
     ' ON CONFLICT (id) DO UPDATE SET flavor = excluded.flavor;'

sequelize.query({ query, values }, { type: sequelize.QueryTypes.INSERT })

This would build a query like:

INSERT INTO 
    fruits (id, name, color, flavor)
VALUES 
    (1, 'Apple', 'Red', 'Yummy'),
    (2, 'Kiwi', 'Green', 'Yuck')
ON CONFLICT (id) DO UPDATE SET 
    flavor = excluded.flavor;

Suffice to say, this is not an ideal solution to have to manually build queries, since it defeats the purpose of using sequelize, but if it's one-off query that you don't desperately need, you could use this method.

Upvotes: 15

Naveen Raju
Naveen Raju

Reputation: 5791

A modified version. Which would do the job.

/**
 *
 * @param {*} data Raw JSON data
 * @param {*} model Sequalize model
 * @param {*} fields Columns thare need to be inserted/update.If none passed, it will extract fields from the data.
 * @returns response consists of data with type of action(upsert/create) performed for each record.
 */
export const bulkUpert = (data, model, fields = undefined) => {
  console.log("****Bulk insertion started****");
  if (!data.length) {
    return [0, 0];
  }
  const { name, primaryKeyAttributes } = model;

  console.log(name, primaryKeyAttributes, fields);

  if (!sequelize) {
    throw new Error(`Sequalize not initialized on ${name}`);
  }

  const extractFields = fields ? fields : Object.keys(data[0]);
  const createFields = extractFields.join(", ");
  const values = data.map(dataToSql()).join(", ");

  const query = `MERGE INTO
    [${name}]
    WITH(HOLDLOCK)
    AS [targetTable]
    USING (
        VALUES ${values}
    )
    AS [sourceTable]
    (
      ${createFields}
    ) ON
    ${getPrimaryQueryString(primaryKeyAttributes)}
    WHEN MATCHED THEN
        UPDATE SET
            ${getUpdateFieldsString(extractFields)}
    WHEN NOT MATCHED THEN
        INSERT (
              ${createFields}
            )
        VALUES
            (
                ${getInsertValuesString(extractFields)}
            )
    OUTPUT $action, INSERTED.*;`;
  return sequelize.query(query);
};

const valueToSQL = () => (value) => {
  if (value === null) {
    return "null";
  }

  if (typeof value === "boolean") {
    return value ? "true" : "false";
  }

  if (typeof value !== "object" || value instanceof Date) {
    return sequelize.escape(value);
  }

  return sequelize.escape(JSON.stringify(value));
};

const getPrimaryQueryString = (primaryKeyAttributes) => {
  let string = "";
  for (let i = 0; i < primaryKeyAttributes.length; i++) {
    string += `[targetTable].[${primaryKeyAttributes[i]}] = [sourceTable].[${primaryKeyAttributes[i]}]`;
    if (i != primaryKeyAttributes.length - 1) {
      string += " AND";
    }
  }
  return string;
};

const getUpdateFieldsString = (fields) => {
  let string = "";
  for (let i = 0; i < fields.length; i++) {
    string += `[targetTable].[${fields[i]}] = [sourceTable].[${fields[i]}]`;
    if (i != fields.length - 1) {
      string += ", ";
    }
  }
  return string;
};

const getInsertValuesString = (fields) => {
  let string = "";
  for (let i = 0; i < fields.length; i++) {
    string += `[sourceTable].[${fields[i]}]`;
    if (i != fields.length - 1) {
      string += ", ";
    }
  }
  return string;
};

const dataToSql = () => (data) =>
  `(${Object.values(data).map(valueToSQL()).join(",")})`;

Upvotes: 0

Michiel De Mey
Michiel De Mey

Reputation: 198

2021 September update

Bulk upserting with unique compound indexes now just works in Sequelize v6.4.4.

https://github.com/sequelize/sequelize/pull/13345

Upvotes: 0

Fran&#231;ois Dispaux
Fran&#231;ois Dispaux

Reputation: 746

2020 November 2nd update

Based on @Yedhin answer, here is a more generic solution (typescript):

export const bulkUpsert = async <T extends Model<T>, K extends keyof T>(
  items: Partial<T>[],
  model: ModelCtor<T>,
  conflictKeys: K[],
  excludeFromUpdate: K[] = [],
): Promise<[number, number]> => {
  if (!items.length) {
    return [0, 0];
  }

  const { tableName, sequelize, name } = model;
  if (!sequelize) {
    throw new Error(`Sequelize not initialized on ${name}?`);
  }

  const sample = items[0];
  const fields = Object.keys(sample) as K[];
  const createFields = `("${fields.join(`","`)}")`;
  const updateFields = fields
    .filter((field) => ![...excludeFromUpdate, ...conflictKeys].includes(field))
    .map((field) => `"${field}"=EXCLUDED."${field}"`)
    .join(', ');
  const values = items.map(dataToSql(sequelize)).join(',');
  const onConflict = `ON CONFLICT ("${conflictKeys.join(`","`)}")`;
  const returning = `"${fields.join('","')}"`;

  const query = `INSERT INTO "${tableName}" ${createFields} VALUES ${values} ${onConflict} DO UPDATE SET ${updateFields} RETURNING ${returning};`;

  return sequelize.query(query, {
    replacements: items,
    type: QueryTypes.INSERT,
  });
};

const valueToSql = (sequelize: Sequelize) => (
  value: string | number | boolean | null | Date | string[] | Record<string, unknown>,
): string => {
  if (value === null) {
    return 'null';
  }

  if (typeof value === 'boolean') {
    return value ? 'true' : 'false';
  }

  if (typeof value !== 'object' || value instanceof Date) {
    return sequelize.escape(value);
  }

  return sequelize.escape(JSON.stringify(value));
};


const dataToSql = <T extends Node<T>>(sequelize: Sequelize) => (data: Partial<T>): string =>
  `(${Object.values(data).map(valueToSql(sequelize)).join(',')})`;

Upvotes: 2

Yedhin
Yedhin

Reputation: 3159

2020 October 1st Update
Sequelize Version: ^6.3.5

The issue still persists. We can't still bulkUpsert with unique composite indexes. bulkCreate with updateOnDuplicates doesn't yet work with unique composite indexes. There are PR's still awaiting to be merged, which may fix this issue:-
https://github.com/sequelize/sequelize/pull/12516
https://github.com/sequelize/sequelize/pull/12547

Workaround

For the time being, if anyone wants a quick workaround, then the following raw query based wrapper can be used by modifying with your own tables attributes, names and data:-

const bulkUpsertIntoTable = async ({ bulkUpsertableData }) => {
  try {
    /* eslint-disable */
   // id column will automatically be incremented if you have set it to auto-increment
   const query = `INSERT INTO "Table" ("non_id_attr1", "non_id_attr2", "non_id_attr3","createdAt", "updatedAt") VALUES ${bulkUpsertableData
    .map((_) => "(?)")
    .join(
      ","
    )} ON CONFLICT ("non_id_attr1","non_id_attr2") DO UPDATE SET "non_id_attr1"=excluded."non_id_attr1", "non_id_attr2"=excluded."non_id_attr2", "non_id_attr3"=excluded."non_id_attr3",  "updatedAt"=excluded."updatedAt" RETURNING "id","non_id_attr1","non_id_attr2","non_id_attr3","createdAt","updatedAt";`;
    /* eslint-enable */

    return await models.sequelize.query(query, {
      replacements: bulkUpsertableData,//------> dont forget to pass your data here
      type: models.Sequelize.QueryTypes.INSERT,
      // transaction:t -----> if required to be done in transaction
    });
  } catch (error) {
    console.error("Bulk Upserting into Table:", error);
    throw error;
  }
};

Important point is creating the bulkUpsertableData, where it should be Array<Array> ie:- [[]]. Example creation:-

// with reference to above wrapper function
const bulkUpsertableData = Object.keys(myObjectData).map(type => [
      myObjectData[type],// -----> non_id_attr1
      type, // -----> non_id_attr2
      someOtherRandomValue, // -----> non_id_attr3
      new Date(), // -----> created_at
      new Date(), // -----> updated_at
]);

// response will have all the raw attributes mentioned in RETURNING clause
const upsertedTableResponse = await bulkUpsertIntoTable({ bulkUpsertableData });

Upvotes: 6

PirateApp
PirateApp

Reputation: 6194

2019 Update

Works for all dialects provided a certain minimum version is matched

HERE is the reference to the source code for the same

  • Note that individual options may or may not work across all dialects For example, updateOnDuplicate will work only on MySQL, MariaDB, SQLite and Postgres

  • ignoreDuplicates option will NOT work on MSSQL

Also check this BLOCK of code in the source

if (Array.isArray(options.updateOnDuplicate) && options.updateOnDuplicate.length) {
    options.updateOnDuplicate = _.intersection(
        _.without(Object.keys(model.tableAttributes), createdAtAttr),
        options.updateOnDuplicate
    );
} else {
    return Promise.reject(new Error('updateOnDuplicate option only supports non-empty array.'));
}

updateOnDuplicate has to be an Array, cannot be true or false

So going with the above points, your code should be something like this

Employee.bulkCreate(data, {
    updateOnDuplicate: ['employeeName', 'employeeAge'],
});

UPDATE:

Since someone mentioned it is not working, try this

models.Employee.bulkCreate(items, {
    returning: ['employeeId'],
    ignoreDuplicates: true
  })

Upvotes: 7

followtest52
followtest52

Reputation: 1176

From the official sequelizejs reference.

It can be done using bulkCreate with the updateOnDuplicate option.

Like this for example :

Employee.bulkCreate(dataArray, 
    {
        fields:["id", "name", "address"] ,
        updateOnDuplicate: ["name"] 
    } )

updateOnDuplicate is an array of fields that will be updated when the primary key (or may be unique key) match the row. Make sure you have at least one unique field (let say id) in your model and in the dataArray both for upsert.

Upvotes: 116

Related Questions