Reputation: 12184
I'm not sure how to define a create schema foo
migration? My Model looks like this (I'm using Flask-Migrate):
class MyTable(db.Model):
__tablename__ = "my_table"
__table_args__ = {"schema": "foo"}
id = ColumnPrimKey()
name = Column(Text, unique=True, nullable=False)
When I execute mange db upgrade
I get a failure because the schema "foo" does not exist. How can I add a migration for the schema with SQLAlchemy and Alembic?
Upvotes: 22
Views: 19851
Reputation: 2319
Here is a version which creates schemas automatically when they are first appearing in a migration (add this into env.py or at least import it from there):
import logging
from collections.abc import Iterable
from typing import Any
import alembic
import sqlalchemy.sql.base
from alembic.autogenerate.api import AutogenContext
from alembic.operations.ops import (
CreateTableOp,
ExecuteSQLOp,
UpgradeOps,
)
_logger = logging.getLogger(f"alembic.{__name__}")
class ExecuteArbitraryDDLOp(ExecuteSQLOp):
def __init__(
self,
ddl: sqlalchemy.sql.base.Executable | str,
reverse_ddl: sqlalchemy.sql.base.Executable | str,
*,
execution_options: dict[str, Any] | None = None,
) -> None:
"""A DDL Operation with both upgrade and downgrade commands."""
super().__init__(ddl, execution_options=execution_options)
self.reverse_ddl = reverse_ddl
def reverse(self) -> "ExecuteArbitraryDDLOp":
"""Return the reverse of this ExecuteArbitraryDDLOp (used for downgrades)."""
return ExecuteArbitraryDDLOp(
ddl=self.reverse_ddl, reverse_ddl=self.sqltext, execution_options=self.execution_options
)
@alembic.autogenerate.comparators.dispatch_for("schema")
def create_missing_schemas(
autogen_context: AutogenContext, upgrade_ops: UpgradeOps, schema_names: Iterable[str | None]
) -> None:
"""Creates missing schemas.
This depends on sqla/alembic to give us all existing
schemas in the schema_names argument.
"""
used_schemas = set()
for operations_group in upgrade_ops.ops:
# We only care about Tables at the top level, so this is enough for us.
if isinstance(operations_group, CreateTableOp) and operations_group.schema:
used_schemas.add(operations_group.schema)
existing_schemas = set(schema_names)
missing_schemas = used_schemas - existing_schemas
if missing_schemas:
for schema in missing_schemas:
_logger.info("Add migration ops for schema: %s", schema)
upgrade_ops.ops.insert(
0,
ExecuteArbitraryDDLOp(
ddl=f"CREATE SCHEMA {schema}",
reverse_ddl=f"DROP SCHEMA {schema}",
),
)
Source: https://www.katzien.de/en/posts/2023-11-16-create-missing-schemas-in-alembic/ (disclaimer: my own blog)
Upvotes: 0
Reputation: 1855
Since the alembic_version
table is created inside the new schema none of the other approaches worked for me using mssql+pyodbc
dialect.
This one does:
with context.begin_transaction():
context.execute(
f"""
IF NOT EXISTS (SELECT * FROM sys.schemas WHERE name = '{schema}')
BEGIN
EXEC('CREATE SCHEMA {schema}')
END
"""
)
context.run_migrations()
Upvotes: 1
Reputation: 3333
A potential issue with the accepted answer is that, for an initial migration, Alembic might have trouble finding a place to create the alembic_version
. This is because because op.execute("create schema foo")
is only executed after Alembic attempts to find its alembic_version
table. The error pops up as:
sqlalchemy.exc.ProgrammingError: (psycopg2.errors.InvalidSchemaName) schema "foo" does not exist
The easy way out would be to have the alembic_version
table live in another schema passing version_table_schema
to context.configure()
(docs).
However, in many cases one might want to simultaneously (i) create a schema from scratch as part of an initial migration (e.g. to set up a testing environment) and (ii) have the alembic_version
table in the target schema. In these cases, an alternative approach is to delegate the schema creation to env.py
. Example:
# env.py
from sqlalchemy import create_engine
from bar import create_database_with_schema_if_not_exists
SQLALCHEMY_DATABASE_URI = ...
schema = "foo"
engine = create_engine(SQLALCHEMY_DATABASE_URI)
create_database_with_schema_if_not_exists(engine, schema)
...
# bar.py
import sqlalchemy
from sqlalchemy_utils import create_database, database_exists
def create_database_with_schema_if_not_exists(engine, schema):
if not database_exists(engine.url):
create_database(engine.url)
if not engine.dialect.has_schema(engine, schema):
engine.execute(sqlalchemy.schema.CreateSchema(schema))
Upvotes: 5
Reputation: 306
Another option is to add the following function to modify the MigrationScript directives in env.py
:
from alembic import operations
def process_revision_directives(context, revision, directives):
"""Modify the MigrationScript directives to create schemata as required.
"""
script = directives[0]
for schema in frozenset(i.schema for i in target_metadata.tables.values()):
script.upgrade_ops.ops.insert(
0, operations.ops.ExecuteSQLOp(f"CREATE SCHEMA IF NOT EXISTS {schema}"))
script.downgrade_ops.ops.append(
operations.ops.ExecuteSQLOp(f"DROP SCHEMA IF EXISTS {schema} RESTRICT"))
then add process_revision_directives=process_revision_directives
in context.configure
.
Upvotes: 1
Reputation: 1532
I accomplished this by modifying the migration upgrade
command to first run:
op.execute("create schema foo")
And in the downgrade
function
op.execute("drop schema foo")
So the whole migration file looks something like:
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision = '6c82c972c61e'
down_revision = '553260b3e828'
branch_labels = None
depends_on = None
def upgrade():
op.execute("create schema foo")
...
def downgrade():
...
op.execute("drop schema foo")
Upvotes: 34