HOOOPLA
HOOOPLA

Reputation: 57

Best way to perform bulk insert SQLAlchemy

I have a tabled called products

which has following columns id, product_id, data, activity_id

What I am essentially trying to do is copy bulk of existing products and update it's activity_id and create new entry in the products table. Example:

I already have 70 existing entries in products with activity_id 2

Now I want to create another 70 entries with same data except for updated activity_id

I could have thousands of existing entries that I'd like to make a copy of and update the copied entries activity_id to be a new id.

products = self.session.query(model.Products).filter(filter1, filter2).all()

This returns all the existing products for a filter.

Then I iterate through products, then simply clone existing products and just update activity_id field.

 for product in products:
                product.activity_id = new_id

 self.uow.skus.bulk_save_objects(simulation_skus)
 self.uow.flush()
 self.uow.commit()

What is the best/ fastest way to do these bulk entries so it kills time, as of now it's OK performance, is there a better solution?

Upvotes: 0

Views: 2070

Answers (1)

Martijn Pieters
Martijn Pieters

Reputation: 1124558

You don't need to load these objects locally, all you really want to do is have the database create these rows.

You essentially want to run a query that creates the rows from the existing rows:

INSERT INTO product (product_id, data, activity_id)
SELECT product_id, data, 2  -- the new activity_id value
FROM product
WHERE activity_id = old_id

The above query would run entirely on the database server; this is far preferable over loading your query into Python objects, then sending all the Python data back to the server to populate INSERT statements for each new row.

Queries like that are something you could do with SQLAlchemy core, the half of the API that deals with generating SQL statements. However, you can use a query built from a declarative ORM model as a starting point. You'd need to

  1. Access the Table instance for the model, as that then lets you create an INSERT statement via the Table.insert() method.
    You could also get the same object from models.Product query, more on that later.
  2. Access the statement that would normally fetch the data for your Python instances for your filtered models.Product query; you can do so via the Query.statement property.
  3. Update the statement to replace the included activity_id column with your new value, and remove the primary key (I'm assuming that you have an auto-incrementing primary key column).
  4. Apply that updated statement to the Insert object for the table via Insert.from_select().
  5. Execute the generated INSERT INTO ... FROM ... query.

Step 1 can be achieved by using the SQLAlchemy introspection API; the inspect() function, applied to a model class, gives you a Mapper instance, which in turn has a Mapper.local_table attribute.

Steps 2 and 3 require a little juggling with the Select.with_only_columns() method to produce a new SELECT statement where we swapped out the column. You can't easily remove a column from a select statement but we can, however, use a loop over the existing columns in the query to 'copy' them across to the new SELECT, and at the same time make our replacement.

Step 4 is then straightforward, Insert.from_select() needs to have the columns that are inserted and the SELECT query. We have both as the SELECT object we have gives us its columns too.

Here is the code for generating your INSERT; the **replace keyword arguments are the columns you want to replace when inserting:

from sqlalchemy import inspect, literal
from sqlalchemy.sql import ClauseElement

def insert_from_query(model, query, **replace):
    # The SQLAlchemy core definition of the table
    table = inspect(model).local_table
    # and the underlying core select statement to source new rows from
    select = query.statement

    # validate asssumptions: make sure the query produces rows from the above table
    assert table in select.froms, f"{query!r} must produce rows from {model!r}"
    assert all(c.name in select.columns for c in table.columns), f"{query!r} must include all {model!r} columns"

    # updated select, replacing the indicated columns
    as_clause = lambda v: literal(v) if not isinstance(v, ClauseElement) else v
    replacements = {name: as_clause(value).label(name) for name, value in replace.items()}
    from_select = select.with_only_columns([
        replacements.get(c.name, c)
        for c in table.columns
        if not c.primary_key
    ])
        
    return table.insert().from_select(from_select.columns, from_select)

I included a few assertions about the model and query relationship, and the code accepts arbitrary column clauses as replacements, not just literal values. You could use func.max(models.Product.activity_id) + 1 as a replacement value (wrapped as a subselect), for example.

The above function executes steps 1-4, producing the desired INSERT SQL statement when printed (I created a products model and query that I thought might be representative):

>>> print(insert_from_query(models.Product, products, activity_id=2))
INSERT INTO products (product_id, data, activity_id) SELECT products.product_id, products.data, :param_1 AS activity_id
FROM products
WHERE products.activity_id != :activity_id_1

All you have to do is execute it:

insert_stmt = insert_from_query(models.Product, products, activity_id=2)
self.session.execute(insert_stmt)

Upvotes: 3

Related Questions