Reputation: 24625
This is my declarative model:
import datetime
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Test(Base):
__tablename__ = 'test'
id = Column(Integer, primary_key=True)
created_date = DateTime(default=datetime.datetime.utcnow)
However, when I try to import this module, I get this error:
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "orm/models2.py", line 37, in <module>
class Test(Base):
File "orm/models2.py", line 41, in Test
created_date = sqlalchemy.DateTime(default=datetime.datetime.utcnow)
TypeError: __init__() got an unexpected keyword argument 'default'
If I use an Integer type, I can set a default value. What's going on?
Upvotes: 325
Views: 485846
Reputation: 3817
None of the posted answers result in having a database-maintained updated_at
.
This is slightly complex to do in PostgreSQL (vs. MySQL) because you must maintain the ON UPDATE
trigger.
The below code works when using either sqlalchemy's create_all
or alembic's --autogenerate
. It supports upgrade
and downgrade
migration actions, but does not handle other situations such as renaming a table or deleting a table.
(You won't see the triggers in the migration generated by alembic, but the on_*
events are executed during execution of the migration.)
Example usage is below:
from sqlalchemy.orm import Mapped, mapped_column
from my_db_lib.timestamps import BaseWithStamps
class Base(BaseWithStamps):
pass
class MyTable(Base):
__tablename__ = 'my_table'
id: Mapped[int] = mapped_column(primary_key=True)
# other table columns
# BaseWithStamps includes `created_at` and `updated_at` by default
Following are the dependencies for provisioning the above example:
[alembic]
script_location = %(here)s/alembic
file_template = %%(year)d_%%(month).2d_%%(day).2d_%%(hour).2d%%(minute).2d-%%(rev)s_%%(slug)s
prepend_sys_path = .
timezone = UTC
revision_environment = true
version_path_separator = os # Use os.pathsep. Default configuration used for new projects.
sqlalchemy.url = driver://user:pass@localhost/dbname
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
from alembic import context
from logging.config import fileConfig
from my_db_lib.helpers import get_schema
from my_db_lib.models import Base
from my_db_lib.timestamps import update_stamp_function
from sqlalchemy import engine_from_config
from sqlalchemy import pool
# this is the Alembic Config object, which provides
# access to the values within the .ini file in use.
config = context.config
# this is the script EnvironmentContext
script = context.script
# Interpret the config file for Python logging.
# This line sets up loggers basically.
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# the DeclarativeBase to target
Target = Base
##
# Runners
##
_configure_kwargs = {
'target_metadata': Target.metadata,
'version_table_schema': get_schema(Target),
'compare_server_default': True,
'compare_type': True,
}
def run_migrations_offline() -> None:
'''
Run migrations in 'offline' mode.
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
Calls to context.execute() here emit the given string to the
script output.
'''
url = config.get_main_option('sqlalchemy.url')
context.configure(
url=url,
literal_binds=True,
dialect_opts={'paramstyle': 'named'},
**_configure_kwargs
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
'''
Run migrations in 'online' mode.
In this scenario we need to create an Engine
and associate a connection with the context.
'''
engine = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix='sqlalchemy.',
poolclass=pool.NullPool,
)
with engine.connect() as connection:
context.configure(
connection=connection,
**_configure_kwargs
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
from sqlalchemy.orm import DeclarativeBase
from typing import Type
def has_table(BaseClass: Type[DeclarativeBase]):
# https://github.com/sqlalchemy/sqlalchemy/blob/96f1172812f858fead45cdc7874abac76f45b339/lib/sqlalchemy/orm/decl_base.py#L1821-L1830
for attr in ('__tablename__', '__table__'):
try:
attr_val = getattr(BaseClass, attr)
except AttributeError:
pass
else:
if attr_val:
return True
return False
def get_tablename(BaseClass: Type[DeclarativeBase]):
return BaseClass.__tablename__ if BaseClass.__tablename__ else BaseClass.__table__.name
def get_schema(BaseClass: Type[DeclarativeBase], default_schema='public'):
try:
table_args = BaseClass.__table_args__
except AttributeError:
pass
else:
schema: str = table_args.get('schema')
if schema:
# schema is defined for this single table
return schema
try:
metadata = BaseClass.metadata
except AttributeError:
pass
else:
try:
schema = metadata.schema
except AttributeError:
pass
else:
if schema:
# schema is defined as default for all tables
return schema
return default_schema
from functools import partial
from sqlalchemy import DDL, Table
from sqlalchemy.engine import Connectable
from sqlalchemy.event import listen
from sqlalchemy.orm import DeclarativeBase
from typing import Type
def on_after_create(*args, **kwargs):
return _on_event('after_create', *args, **kwargs)
def on_before_drop(*args, **kwargs):
return _on_event('before_drop', *args, **kwargs)
# https://stackoverflow.com/a/34029220
def _on_event(
event: str,
BaseClass: Type[DeclarativeBase],
ddl: DDL,
):
def listener(
tablename: str,
ddl: DDL,
table: Table,
bind: Connectable,
**kwargs
):
if table.name == tablename:
ddl(table, bind, **kwargs)
listen(
Table,
event,
partial(listener, BaseClass.__tablename__, ddl)
)
from alembic_utils.pg_function import PGFunction
from alembic_utils.pg_trigger import PGTrigger
from datetime import datetime
from sqlalchemy import DDL, types
from sqlalchemy.ext.compiler import compiles
from sqlalchemy.orm import DeclarativeBase, Mapped, mapped_column
from sqlalchemy.schema import FetchedValue
from sqlalchemy.sql import expression
from my_db_lib.events import on_after_create, on_before_drop
from my_db_lib.helpers import get_schema, get_tablename, has_table
from typing import Type
class UtcNow(expression.FunctionElement):
# https://stackoverflow.com/q/13370317#comment90734051_31484933
type = types.DateTime()
@compiles(UtcNow, 'postgresql')
def pg_utc_now(element, compiler, **kw):
return "TIMEZONE('utc', CURRENT_TIMESTAMP)"
def make_stamp_function(BaseClass: Type[DeclarativeBase]):
schema = get_schema(BaseClass)
return PGFunction(
schema=schema,
signature='update_stamp()',
definition=('''
RETURNS
TRIGGER
AS $$
BEGIN
NEW.updated_at = now();
RETURN NEW;
END;
$$ LANGUAGE 'plpgsql'
;
''')
)
def make_stamp_trigger(BaseClass: Type[DeclarativeBase]):
schema = get_schema(BaseClass)
table = get_tablename(BaseClass)
schema_table = f'"{schema}"."{table}"'
return PGTrigger(
schema=schema,
signature=f'{table}_updated_at',
on_entity=schema_table,
definition=(f'''
BEFORE
UPDATE
ON
{schema_table}
FOR EACH
ROW
EXECUTE
PROCEDURE "{schema}".update_stamp()
;
'''),
)
PGObject = PGFunction | PGTrigger
def to_sql_create(pg_obj: PGObject):
return '\n'.join(
str(s) for s in
pg_obj.to_sql_statement_create_or_replace()
).strip()
def to_sql_drop(pg_obj: PGObject):
return str(pg_obj.to_sql_statement_drop()).strip()
class BaseWithStamps(DeclarativeBase):
def __init_subclass__(cls, *args, **kwargs):
if has_table(cls):
cls.created_at = cls._init_created_at()
cls.updated_at = cls._init_updated_at()
return super().__init_subclass__(*args, **kwargs)
@staticmethod
def _init_created_at() -> Mapped[datetime]:
return mapped_column(
types.TIMESTAMP(timezone=True),
server_default=UtcNow()
)
@classmethod
def _init_updated_at(cls) -> Mapped[datetime]:
stamp_func = make_stamp_function(cls)
stamp_trig = make_stamp_trigger(cls)
on_after_create(cls, DDL(to_sql_create(stamp_func)))
on_after_create(cls, DDL(to_sql_create(stamp_trig)))
on_before_drop(cls, DDL(to_sql_drop(stamp_trig)))
return mapped_column(
types.DateTime(timezone=True),
server_default=UtcNow(),
server_onupdate=FetchedValue()
)
Upvotes: 0
Reputation: 1360
TL;DR - version controlled database triggers to automatically update updated_at
field, and make the ORM aware and issue a RETURNING statement.
WHY? - because we want to avoid relying purely on ORM's onupdate
to make suer update_at
field is up-to-date.
NOTE: this code was designed for PostgreSQL, but would be similar for other databases.
To extend on this answer, for a default value you should use server_default
as suggested. However, for having a value that will be updated upon an UPDATE statement, you have 2 options. First is described in the linked answer. The second option is using database triggers. I found alembic-utils
useful to make sure triggered is migrated and updated when the version-controlled is modified. Combining it with a Base model and table listing allowed adding those created/updated fields to all ORM models.
from sqlalchemy.orm import Mapped, mapped_column
from sqlalchemy import func, FetchedValue
class Base(DeclarativeBase):
__abstract__ = True
metadata = MetaData(naming_convention=convention)
id: Mapped[int] = mapped_column(Identity(), primary_key=True)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
updated_at: Mapped[datetime] = mapped_column(
server_default=FetchedValue(), server_onupdate=FetchedValue()
)
# IMPORTANT! see details below
__mapper_args__ = {"eager_defaults": True}
class Customer(Base):
__tablename__ = "customers"
name: Mapped[str] = mapped_column(nullable=False)
triggers.py:
from alembic_utils.pg_function import PGFunction
from alembic_utils.pg_trigger import PGTrigger
from alembic_utils.replaceable_entity import register_entities
# import your `Base` module - ONLY AFTER all other modules were imported as well
# that's anyway required for handling the metadata in `alembic`.
from ... import Base
update_update_time_func = PGFunction(
schema="public",
signature="update_updated_at()",
definition="""
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = (CURRENT_TIMESTAMP AT TIME ZONE 'UTC');
RETURN NEW;
END;
$$ LANGUAGE plpgsql
""",
)
register_entities([update_update_time_func])
update_time_triggers = [
PGTrigger(
schema="public",
signature=f"update_{tb_name}_updated_at",
on_entity=tb_name,
definition=f"""
BEFORE INSERT OR UPDATE ON {tb_name}
FOR EACH ROW EXECUTE FUNCTION update_updated_at()
""",
)
for tb_name, tb in Base.metadata.tables.items()
if any(c.name == "updated_at" for c in tb.columns)
]
register_entities(update_time_triggers)
In migrations/env.py
import your triggers
module.
It will register the trigger entities and the stored procedure.
running alembic revision --autogenerate -m "add triggers"
should identify the function and triggers, and alembic upgrade head
should add the function and triggers.
Eager defaults and performance: implementing this solution, it was important for me both:
ORM
), the instance at hand will be updated by the database-generated values (created and updated timestamps, in this case).To demonstrate the above consider the following code:
with Session() as session:
c = Customer(name='foo')
session.add(c)
print(f'Instance before INSERT: {c}')
print('INSERT logs:')
session.commit()
print(f'Instance after INSERT: {c}')
c.name = 'bar'
print('UPDATE logs:')
session.commit()
print(f'Instance after UPDATE: {c}')
Which produces these logs (slightly edited for readability):
Instance before INSERT:
<Customer(name='foo')>
INSERT logs:
BEGIN (implicit)
INSERT INTO customers (name) VALUES (%(name)s) RETURNING customers.id, customers.created_at, customers.updated_at
[cached since 1.849e+04s ago] {'name': 'foo'}
COMMIT
Instance after INSERT:
<Customer(name='foo', id=8, created_at=datetime.datetime(2023, 12, 5, 14, 23, 3, 964627), updated_at=datetime.datetime(2023, 12, 5, 14, 23, 3, 964627))>
UPDATE logs:
BEGIN (implicit)
UPDATE customers SET name=%(name)s WHERE customers.id = %(customers_id)s RETURNING customers.updated_at
[cached since 220s ago] {'name': 'bar', 'customers_id': 8}
COMMIT
Instance after UPDATE:
<Customer(name='bar', id=8, created_at=datetime.datetime(2023, 12, 5, 14, 23, 3, 964627), updated_at=datetime.datetime(2023, 12, 5, 14, 23, 3, 970578))>
Upvotes: 2
Reputation: 383
It's better to use like this way,,
from sqlalchemy import DateTime
from datetime import datetime # Import datetime
created_at = Column(DateTime, default=datetime.datetime.now())
updated_at = Column(DateTime, onupdate=datetime.datetime.now())
Sorry for the Previous version. Now it's perfectly works for me...
created_at = Column(DateTime, default=datetime.datetime.now())
updated_at = Column(DateTime, default=datetime.datetime.now(), onupdate=datetime.datetime.now())
import uuid
from app.configs.database import Base
from sqlalchemy import Column, String, Integer, DateTime, Enum
from app.utils.utils import ERoles
import datetime
class User(Base):
__tablename__ = "Users"
id = Column(Integer, primary_key=True, index=True)
uuid = Column(String(36), unique=True,
default=str(uuid.uuid4()))
name = Column(String(256), nullable=True)
username = Column(String(256), unique=True)
email = Column(String(256), unique=True)
password = Column(String(256))
role = Column(Enum(ERoles), default=ERoles.USER)
created_at = Column(DateTime, default=datetime.datetime.now())
updated_at = Column(DateTime, default=datetime.datetime.now(),
onupdate=datetime.datetime.now())
Upvotes: -2
Reputation: 1
You can use TIMESTAMP with sqlalchemy.
from sqlalchemy import TIMESTAMP, Table, MetaData, Column, ...
... ellipsis ...
def function_name(self) -> Table:
return Table(
"table_name",
self._metadata,
...,
Column("date_time", TIMESTAMP),
)
... ellipsis ...
Upvotes: -1
Reputation: 230
Using the default
parameter with datetime.now
:
from sqlalchemy import Column, Integer, DateTime
from datetime import datetime
class Test(Base):
__tablename__ = 'test'
id = Column(Integer, primary_key=True)
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
Upvotes: 15
Reputation: 7506
Note that for server_default=func.now()
and func.now()
to work :
Local_modified = Column(DateTime, server_default=func.now(), onupdate=func.now())
you need to set DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
in your table DDL.
For example
create table test
(
id int auto_increment
primary key,
source varchar(50) null,
Local_modified datetime DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
)
collate=utf8mb4_bin;
Otherwise, server_default=func.now(), onupdate=func.now()
makes no effects.
Upvotes: 0
Reputation: 21
For mariadb thats worked for me:
from sqlalchemy import Column, Integer, String, DateTime, TIMESTAMP, text
from sqlalchemy.sql import func
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Test(Base):
__tablename__ = "test"
id = Column(Integer, primary_key=True, autoincrement=True)
name = Column(String(255), nullable=False)
email = Column(String(255), nullable=False)
created_at = Column(TIMESTAMP, nullable=False, server_default=func.now())
updated_at = Column(DateTime, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
In the sqlalchemy documentation for mariadb, it is recommended to import the text
from sqlalchemy itself and set the server_default
with the text
, inserting the custom command.
updated_at=Column(DateTime, server_default=text("CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP"))
To understand func.now
you can read the sql alchemy documentation.
Hope I helped in some way.
Upvotes: 2
Reputation: 260
Jeff Widman said on his answer that you need to create your own implementation of UTC timestamps for func.utcnow()
As I didnt want to implement it myself, I have searched for and found a python package which already does the job and is maintained by many people.
The package name is spoqa/sqlalchemy-ut.
A summary of what the package does is: Long story short, UtcDateTime does:
take only aware datetime.datetime
,
return only aware datetime.datetime
,
never take or return naive datetime.datetime
,
ensure timestamps in database always to be encoded in UTC, and
work as you’d expect.
Upvotes: 0
Reputation: 1179
As per PostgreSQL documentation:
now
, CURRENT_TIMESTAMP
, LOCALTIMESTAMP
return the time of transaction
This is considered a feature: the intent is to allow a single transaction to have a consistent notion of the "current" time, so that multiple modifications within the same transaction bear the same time stamp.
You might want to use statement_timestamp
or clock_timestamp
if you don't want transaction timestamp.
statement_timestamp()
returns the start time of the current statement (more specifically, the time of receipt of the latest command message from the client). statement_timestamp
clock_timestamp()
returns the actual current time, and therefore its value changes even within a single SQL command.
Upvotes: 1
Reputation: 381
You likely want to use onupdate=datetime.now
so that UPDATEs also change the last_updated
field.
SQLAlchemy has two defaults for python executed functions.
default
sets the value on INSERT, only onceonupdate
sets the value to the callable result on UPDATE as well.Upvotes: 14
Reputation: 1449
You can also use sqlalchemy builtin function for default DateTime
from sqlalchemy.sql import func
DT = Column(DateTime(timezone=True), default=func.now())
Upvotes: 87
Reputation: 23472
For sanity, you probably want to have all datetimes
calculated by your DB server, rather than the application server. Calculating the timestamp in the application can lead to problems because network latency is variable, clients experience slightly different clock drift, and different programming languages occasionally calculate time slightly differently.
SQLAlchemy allows you to do this by passing func.now()
or func.current_timestamp()
(they are aliases of each other) which tells the DB to calculate the timestamp itself.
server_default
Additionally, for a default where you're already telling the DB to calculate the value, it's generally better to use server_default
instead of default
. This tells SQLAlchemy to pass the default value as part of the CREATE TABLE
statement.
For example, if you write an ad hoc script against this table, using server_default
means you won't need to worry about manually adding a timestamp call to your script--the database will set it automatically.
onupdate
/server_onupdate
SQLAlchemy also supports onupdate
so that anytime the row is updated it inserts a new timestamp. Again, best to tell the DB to calculate the timestamp itself:
from sqlalchemy.sql import func
time_created = Column(DateTime(timezone=True), server_default=func.now())
time_updated = Column(DateTime(timezone=True), onupdate=func.now())
There is a server_onupdate
parameter, but unlike server_default
, it doesn't actually set anything serverside. It just tells SQLalchemy that your database will change the column when an update happens (perhaps you created a trigger on the column ), so SQLAlchemy will ask for the return value so it can update the corresponding object.
You might be surprised to notice that if you make a bunch of changes within a single transaction, they all have the same timestamp. That's because the SQL standard specifies that CURRENT_TIMESTAMP
returns values based on the start of the transaction.
PostgreSQL provides the non-SQL-standard statement_timestamp()
and clock_timestamp()
which do change within a transaction. Docs here: https://www.postgresql.org/docs/current/static/functions-datetime.html#FUNCTIONS-DATETIME-CURRENT
If you want to use UTC timestamps, a stub of implementation for func.utcnow()
is provided in SQLAlchemy documentation. You need to provide appropriate driver-specific functions on your own though.
Upvotes: 732
Reputation: 43024
The default
keyword parameter should be given to the Column object.
Example:
Column(u'timestamp', TIMESTAMP(timezone=True), primary_key=False, nullable=False, default=time_now),
The default value can be a callable, which here I defined like the following.
from pytz import timezone
from datetime import datetime
UTC = timezone('UTC')
def time_now():
return datetime.now(UTC)
Upvotes: 9
Reputation: 39698
DateTime
doesn't have a default key as an input. The default key should be an input to the Column
function. Try this:
import datetime
from sqlalchemy import Column, Integer, DateTime
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()
class Test(Base):
__tablename__ = 'test'
id = Column(Integer, primary_key=True)
created_date = Column(DateTime, default=datetime.datetime.utcnow)
Upvotes: 283