s5s
s5s

Reputation: 12184

SQLAlchemy + alembic: create schema migration

I'm not sure how to define a create schema foo migration? My Model looks like this (I'm using Flask-Migrate):

class MyTable(db.Model):
    __tablename__ = "my_table"
    __table_args__ = {"schema": "foo"}

    id = ColumnPrimKey()
    name = Column(Text, unique=True, nullable=False)

When I execute mange db upgrade I get a failure because the schema "foo" does not exist. How can I add a migration for the schema with SQLAlchemy and Alembic?

Upvotes: 22

Views: 19851

Answers (5)

Jan Katins
Jan Katins

Reputation: 2319

Here is a version which creates schemas automatically when they are first appearing in a migration (add this into env.py or at least import it from there):

import logging
from collections.abc import Iterable
from typing import Any

import alembic
import sqlalchemy.sql.base
from alembic.autogenerate.api import AutogenContext
from alembic.operations.ops import (
    CreateTableOp,
    ExecuteSQLOp,
    UpgradeOps,
)

_logger = logging.getLogger(f"alembic.{__name__}")


class ExecuteArbitraryDDLOp(ExecuteSQLOp):
    def __init__(
        self,
        ddl: sqlalchemy.sql.base.Executable | str,
        reverse_ddl: sqlalchemy.sql.base.Executable | str,
        *,
        execution_options: dict[str, Any] | None = None,
    ) -> None:
        """A DDL Operation with both upgrade and downgrade commands."""
        super().__init__(ddl, execution_options=execution_options)
        self.reverse_ddl = reverse_ddl

    def reverse(self) -> "ExecuteArbitraryDDLOp":
        """Return the reverse of this ExecuteArbitraryDDLOp (used for downgrades)."""
        return ExecuteArbitraryDDLOp(
            ddl=self.reverse_ddl, reverse_ddl=self.sqltext, execution_options=self.execution_options
        )


@alembic.autogenerate.comparators.dispatch_for("schema")
def create_missing_schemas(
    autogen_context: AutogenContext, upgrade_ops: UpgradeOps, schema_names: Iterable[str | None]
) -> None:
    """Creates missing schemas.

    This depends on sqla/alembic to give us all existing
    schemas in the schema_names argument.
    """
    used_schemas = set()
    for operations_group in upgrade_ops.ops:
        # We only care about Tables at the top level, so this is enough for us.
        if isinstance(operations_group, CreateTableOp) and operations_group.schema:
            used_schemas.add(operations_group.schema)

    existing_schemas = set(schema_names)
    missing_schemas = used_schemas - existing_schemas
    if missing_schemas:
        for schema in missing_schemas:
            _logger.info("Add migration ops for schema: %s", schema)
            upgrade_ops.ops.insert(
                0,
                ExecuteArbitraryDDLOp(
                    ddl=f"CREATE SCHEMA {schema}",
                    reverse_ddl=f"DROP SCHEMA {schema}",
                ),
            )

Source: https://www.katzien.de/en/posts/2023-11-16-create-missing-schemas-in-alembic/ (disclaimer: my own blog)

Upvotes: 0

HeyMan
HeyMan

Reputation: 1855

Since the alembic_version table is created inside the new schema none of the other approaches worked for me using mssql+pyodbc dialect.

This one does:

with context.begin_transaction():
    context.execute(
        f"""
        IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')
        BEGIN 
          EXEC('CREATE SCHEMA {schema}') 
        END
        """
    )
    context.run_migrations()

Upvotes: 1

swimmer
swimmer

Reputation: 3333

A potential issue with the accepted answer is that, for an initial migration, Alembic might have trouble finding a place to create the alembic_version. This is because because op.execute("create schema foo") is only executed after Alembic attempts to find its alembic_version table. The error pops up as:

sqlalchemy.exc.ProgrammingError: (psycopg2.errors.InvalidSchemaName) schema "foo" does not exist

The easy way out would be to have the alembic_version table live in another schema passing version_table_schema to context.configure() (docs).

However, in many cases one might want to simultaneously (i) create a schema from scratch as part of an initial migration (e.g. to set up a testing environment) and (ii) have the alembic_version table in the target schema. In these cases, an alternative approach is to delegate the schema creation to env.py. Example:

# env.py
from sqlalchemy import create_engine

from bar import create_database_with_schema_if_not_exists

SQLALCHEMY_DATABASE_URI = ...
schema = "foo"
engine = create_engine(SQLALCHEMY_DATABASE_URI)
create_database_with_schema_if_not_exists(engine, schema)
...
# bar.py
import sqlalchemy
from sqlalchemy_utils import create_database, database_exists

def create_database_with_schema_if_not_exists(engine, schema):
    if not database_exists(engine.url):
        create_database(engine.url)
    if not engine.dialect.has_schema(engine, schema):
        engine.execute(sqlalchemy.schema.CreateSchema(schema))

Upvotes: 5

auxsvr
auxsvr

Reputation: 306

Another option is to add the following function to modify the MigrationScript directives in env.py:

from alembic import operations

def process_revision_directives(context, revision, directives):
    """Modify the MigrationScript directives to create schemata as required.
    """
    script = directives[0]
    for schema in frozenset(i.schema for i in target_metadata.tables.values()):
        script.upgrade_ops.ops.insert(
            0, operations.ops.ExecuteSQLOp(f"CREATE SCHEMA IF NOT EXISTS {schema}"))
        script.downgrade_ops.ops.append(
            operations.ops.ExecuteSQLOp(f"DROP SCHEMA IF EXISTS {schema} RESTRICT"))

then add process_revision_directives=process_revision_directives in context.configure.

Upvotes: 1

buck
buck

Reputation: 1532

I accomplished this by modifying the migration upgrade command to first run:

op.execute("create schema foo")

And in the downgrade function

op.execute("drop schema foo")

So the whole migration file looks something like:

from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = '6c82c972c61e'
down_revision = '553260b3e828'
branch_labels = None
depends_on = None


def upgrade():
    op.execute("create schema foo")
    ...

def downgrade():
    ...
    op.execute("drop schema foo")

Upvotes: 34

Related Questions