Tim Tisdall
Tim Tisdall

Reputation: 10382

SQLAlchemy logging of changes with date and user

This is very similar to another question that's over 3 years old: What's a good general way to look SQLAlchemy transactions, complete with authenticated user, etc?

I'm working on an application where I'd like to log all changes to particular tables. There's currently a really good "recipe" that does versioning, but I need to modify it to instead record a datetime when the change occurred and a user id of who made the change. I took the history_meta.py example that's packaged with SQLAlchemy and made it record times instead of version numbers, but I'm having trouble figuring out how to pass in a user id.

The question I referenced above suggests including the user id in the session object. That makes a lot of sense, but I'm not sure how to do that. I've tried something simple like session.userid = authenticated_userid(request) but in history_meta.py that attribute doesn't seem to be on the session object any more.

I'm doing all of this in the Pyramid framework and the session object that I'm using is defined as DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension())). In a view I do session = DBSession() and then proceed to use session. (I'm not really sure if that's necessary, but that's what's going on)

Here's my modified history_meta.py in case someone might find it useful:

from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import mapper, class_mapper, attributes, object_mapper
from sqlalchemy.orm.exc import UnmappedClassError, UnmappedColumnError
from sqlalchemy import Table, Column, ForeignKeyConstraint, DateTime
from sqlalchemy import event
from sqlalchemy.orm.properties import RelationshipProperty
from datetime import datetime

def col_references_table(col, table):
    for fk in col.foreign_keys:
        if fk.references(table):
            return True
    return False

def _history_mapper(local_mapper):
    cls = local_mapper.class_

    # set the "active_history" flag
    # on on column-mapped attributes so that the old version
    # of the info is always loaded (currently sets it on all attributes)
    for prop in local_mapper.iterate_properties:
        getattr(local_mapper.class_, prop.key).impl.active_history = True

    super_mapper = local_mapper.inherits
    super_history_mapper = getattr(cls, '__history_mapper__', None)

    polymorphic_on = None
    super_fks = []
    if not super_mapper or local_mapper.local_table is not super_mapper.local_table:
        cols = []
        for column in local_mapper.local_table.c:
            if column.name == 'version_datetime':
                continue

            col = column.copy()
            col.unique = False

            if super_mapper and col_references_table(column, super_mapper.local_table):
                super_fks.append((col.key, list(super_history_mapper.local_table.primary_key)[0]))

            cols.append(col)

            if column is local_mapper.polymorphic_on:
                polymorphic_on = col

        if super_mapper:
            super_fks.append(('version_datetime', super_history_mapper.base_mapper.local_table.c.version_datetime))
            cols.append(Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=True))
        else:
            cols.append(Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=True))

        if super_fks:
            cols.append(ForeignKeyConstraint(*zip(*super_fks)))

        table = Table(local_mapper.local_table.name + '_history', local_mapper.local_table.metadata,
           *cols
        )
    else:
        # single table inheritance.  take any additional columns that may have
        # been added and add them to the history table.
        for column in local_mapper.local_table.c:
            if column.key not in super_history_mapper.local_table.c:
                col = column.copy()
                col.unique = False
                super_history_mapper.local_table.append_column(col)
        table = None

    if super_history_mapper:
        bases = (super_history_mapper.class_,)
    else:
        bases = local_mapper.base_mapper.class_.__bases__
    versioned_cls = type.__new__(type, "%sHistory" % cls.__name__, bases, {})

    m = mapper(
            versioned_cls,
            table,
            inherits=super_history_mapper,
            polymorphic_on=polymorphic_on,
            polymorphic_identity=local_mapper.polymorphic_identity
            )
    cls.__history_mapper__ = m

    if not super_history_mapper:
        local_mapper.local_table.append_column(
            Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=False)
        )
        local_mapper.add_property("version_datetime", local_mapper.local_table.c.version_datetime)


class Versioned(object):
    @declared_attr
    def __mapper_cls__(cls):
        def map(cls, *arg, **kw):
            mp = mapper(cls, *arg, **kw)
            _history_mapper(mp)
            return mp
        return map


def versioned_objects(iter):
    for obj in iter:
        if hasattr(obj, '__history_mapper__'):
            yield obj

def create_version(obj, session, deleted = False):
    obj_mapper = object_mapper(obj)
    history_mapper = obj.__history_mapper__
    history_cls = history_mapper.class_

    obj_state = attributes.instance_state(obj)

    attr = {}

    obj_changed = False

    for om, hm in zip(obj_mapper.iterate_to_root(), history_mapper.iterate_to_root()):
        if hm.single:
            continue

        for hist_col in hm.local_table.c:
            if hist_col.key == 'version_datetime':
                continue

            obj_col = om.local_table.c[hist_col.key]

            # get the value of the
            # attribute based on the MapperProperty related to the
            # mapped column.  this will allow usage of MapperProperties
            # that have a different keyname than that of the mapped column.
            try:
                prop = obj_mapper.get_property_by_column(obj_col)
            except UnmappedColumnError:
                # in the case of single table inheritance, there may be
                # columns on the mapped table intended for the subclass only.
                # the "unmapped" status of the subclass column on the
                # base class is a feature of the declarative module as of sqla 0.5.2.
                continue

            # expired object attributes and also deferred cols might not be in the
            # dict.  force it to load no matter what by using getattr().
            if prop.key not in obj_state.dict:
                getattr(obj, prop.key)

            a, u, d = attributes.get_history(obj, prop.key)

            if d:
                attr[hist_col.key] = d[0]
                obj_changed = True
            elif u:
                attr[hist_col.key] = u[0]
            else:
                # if the attribute had no value.
                attr[hist_col.key] = a[0]
                obj_changed = True

    if not obj_changed:
        # not changed, but we have relationships.  OK
        # check those too
        for prop in obj_mapper.iterate_properties:
            if isinstance(prop, RelationshipProperty) and \
                attributes.get_history(obj, prop.key).has_changes():
                obj_changed = True
                break

    if not obj_changed and not deleted:
        return

    attr['version_datetime'] = obj.version_datetime
    hist = history_cls()
    for key, value in attr.items():
        setattr(hist, key, value)
    session.add(hist)
    print(dir(session))
    obj.version_datetime = datetime.now()

def versioned_session(session):
    @event.listens_for(session, 'before_flush')
    def before_flush(session, flush_context, instances):
        for obj in versioned_objects(session.dirty):
            create_version(obj, session)
        for obj in versioned_objects(session.deleted):
            create_version(obj, session, deleted = True)

UPDATE: Okay, it seems that in the before_flush() method the session I get is of type sqlalchemy.orm.session.Session where the session I attached the user_id to was sqlalchemy.orm.scoping.scoped_session. So, at some point an object layer is stripped off. Is it safe to assign the user_id to the Session within the scoped_session? Can I be sure that it won't be there for other requests?

Upvotes: 15

Views: 7014

Answers (3)

Comp Guy
Comp Guy

Reputation: 71

I ran into this old question recently. My requirement is to log all changes to a set of tables.

I'll post the code I ended up with here in case anyone finds it useful. It has some limitations, especially around deletes, but works for my purposes. The code supports logging audit records for selected tables to either a log file, or an audit table in the db.

from app import db
import datetime
from flask import current_app, g

# your own session user goes here
#  you'll need an id and an email in that model
from flask_user import current_user  as user     

import importlib
import logging
from sqlalchemy import event, inspect
from sqlalchemy.orm.attributes import get_history
from sqlalchemy.orm import ColumnProperty, class_mapper
from uuid import uuid4


class AuditManager (object):

    config = {'storage': 'log',
              #define class for Audit model for your project, if saving audit records in db
              'auditModel': 'app.models.user_models.Audit'}  

    def __init__(self, app):
        if 'AUDIT_CONFIG' in app.config:
            app.before_request(self.before_request_handler)
            self.config.update(app.config['AUDIT_CONFIG'])
            event.listen(
              db.session,
              'after_flush',
              self.db_after_flush
            )
            event.listen(
              db.session,
              'before_flush',
              self.db_before_flush
            )
            event.listen(
              db.session,
              'after_bulk_delete',
              self.db_after_bulk_delete
            )
            if self.config['storage'] == 'log':
                self.logger = logging.getLogger(__name__)
            elif self.config['storage'] == 'db':
                # Load Audit model class at runtime, so that log file users dont need to define it
                module_name, class_name = self.config['auditModel'].rsplit(".", 1)
                self.AuditModel = getattr(importlib.import_module(module_name), class_name)


    #Create a global request id
    # Use this to group transactions together
    def before_request_handler(self):
        g.request_id = uuid4()


    def db_after_flush(self, session, flush_context):
        for instance in session.new:
            if instance.__tablename__ in self.config['tables']:
                # Record the inserts for this table
                data = {}
                auditFields = getattr(instance.__class__, 'Meta', None)
                auditFields = getattr(auditFields,\
                    'auditFields',                #Prefer to list auditable fields explicitly in the model's Meta class
                    self.get_fields(instance))    # or derive them otherwise
                for attr in auditFields:
                    data[attr] = str(getattr(instance, attr, 'not set'))   #Make every  value a string in audit
                    
                self.log_it (session, 'insert', instance, data)


    def db_before_flush(self, session, flush_context, instances):
        for instance in session.dirty:
            # Record the changes for this table
            if instance.__tablename__ in self.config['tables']:
                inspection = inspect(instance)
                data = {}
                auditFields = getattr(instance.__class__, 'Meta', None)
                auditFields = getattr(auditFields,\
                    'auditFields',                
                    self.get_fields(instance))    
                for attr in auditFields: 
                    if getattr(inspection.attrs, attr).history.has_changes(): #We only log the new data
                        data[attr] = str(getattr(instance, attr, 'not set'))   
                self.log_it (session, 'change', instance, data)
                        
                    
        for instance in session.deleted:
                # Record the deletes for this table
                #  for this to be triggered, you must use this session based delete object construct.  
                #   Eg: session.delete({query}.first())
            if instance.__tablename__ in self.config['tables']:
                data = {}
                auditFields = getattr(instance.__class__, 'Meta', None)
                auditFields = getattr(auditFields,\
                    'auditFields',                
                    self.get_fields(instance))    
                for attr in auditFields:
                    data[attr] = str(getattr(instance, attr, 'not set'))   
                    
                self.log_it (session, 'delete', instance, data)

    def db_after_bulk_delete(self, delete_context):
        instance =  delete_context.query.column_descriptions[0]['type']  #only works for single table deletes
        if delete_context.result.returns_rows:
        #  Not sure exactly how after_bulk_delete is expected work, since the context.results is empty, 
        #   as delete statement return no results
            for row in delete_context.result:
                data = {}
                auditFields = getattr(instance.__class__, 'Meta', None)
                auditFields = getattr(auditFields,\
                    'auditFields',                
                    self.get_fields(instance))    
                for attr in auditFields:
                    data[attr] = str(getattr(row, attr, 'not set'))   #Make every  value a string in audit
                    
                self.log_it (delete_context.session, 'delete', instance, data)
                        
        else:
            # Audit what we can when we don't have indiividual rows to look at
            self.log_it (delete_context.session, 'delete', instance,\
               {"rowcount": delete_context.result.rowcount})

    def log_it (self, session, action, instance, data):
        if self.config['storage'] == 'log':
            self.logger.info("request_id: %s, table: %s, action: %s, user id: %s, user email: %s, date: %s, data: %s" \
               %  (getattr(g, 'request_id', None), instance.__tablename__, action, getattr(user, 'id', None), getattr(user, 'email', None),\
                   datetime.datetime.now(), data))
        elif self.config['storage'] == 'db':
            audit = self.AuditModel(request_id=str(getattr(g, 'request_id', None)),
                          table=str(instance.__tablename__),
                          action=action,
                          user_id=getattr(user, 'id', None),
                          user_email=getattr(user, 'email', None),
                          date=datetime.datetime.now(),
                          data=data
                          )
            session.add(audit)
            
    def get_fields(self, instance):
        fields = []
        for attr in class_mapper(instance.__class__).column_attrs:
            fields.append(attr.key)
        return fields

Suggested Model, if you want to store audit records in the database.

class Audit(db.Model):
    __tablename__ = 'audit'
 
    id          = db.Column(db.Integer, primary_key=True)
    request_id = db.Column(db.Unicode(50), nullable=True, index=True, server_default=u'')
    table      = db.Column(db.Unicode(50), nullable=False, index=True, server_default=u'')
    action     = db.Column(db.Unicode(20), nullable=False, server_default=u'')
    user_id = db.Column(db.Integer, db.ForeignKey('user.id', ondelete='SET NULL'), nullable=True, )
    user_email = db.Column(db.Unicode(255), nullable=False, server_default=u'')
    date       = db.Column(db.DateTime,  default=db.func.now())
    data       = db.Column(JSON)
    

In settings:

AUDIT_CONFIG = {
  "tables": ['user', 'order', 'batch']
}

Upvotes: 1

Matt Graham
Matt Graham

Reputation: 232

Old question, but still very relevant.

You should avoid trying to place web session information on the database session. It's combining unrelated concerns and each has it's own lifecycle (which don't match). Here's an approach I use in Flask with SQLAlchemy (not Flask-SQLAlchemy, but that should work too). I've tried to comment where Pyramid would be different.

from flask import has_request_context  # How to check if in a Flask session
from sqlalchemy import inspect
from sqlalchemy.orm import class_mapper
from sqlalchemy.orm.attributes import get_history
from sqlalchemy.event import listen

from YOUR_SESSION_MANAGER import get_user  # This would be something in Pyramid
from my_project import models  # Where your models are defined

def get_object_changes(obj):
    """ Given a model instance, returns dict of pending
    changes waiting for database flush/commit.

    e.g. {
        'some_field': {
            'before': *SOME-VALUE*,
            'after': *SOME-VALUE*
        },
        ...
    }
    """
    inspection = inspect(obj)
    changes = {}
    for attr in class_mapper(obj.__class__).column_attrs:
        if getattr(inspection.attrs, attr.key).history.has_changes():
            if get_history(obj, attr.key)[2]:
                before = get_history(obj, attr.key)[2].pop()
                after = getattr(obj, attr.key)
                if before != after:
                    if before or after:
                        changes[attr.key] = {'before': before, 'after': after}
    return changes

def my_model_change_listener(mapper, connection, target):
    changes = get_object_changes(target)
    changes.pop("modify_ts", None)  # remove fields you don't want to track

    user_id = None
    if has_request_context():
        # Call your function to get active user and extract id
        user_id = getattr(get_user(), 'id', None)

    if user_id is None:
        # What do you want to do if user can't be determined
        pass

    # You now have the model instance (target), the user_id who is logged in,
    # and a dictionary of changes.

    # Either do somthing "quick" with it here or call an async task (e.g.
    # Celery) to do something with the information that may take longer
    # than you want the request to take.

# Add the listener
listen(models.MyModel, 'after_update', my_model_change_listener)

Upvotes: 3

Tim Tisdall
Tim Tisdall

Reputation: 10382

After a bunch of fiddling I seem to able to set values on the session object within the scoped_session by doing the following:

DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
session = DBSession()
inner_session = session.registry()
inner_session.user_id = "test"
versioned_session(session)

Now the session object being passed around in history_meta.py has a user_id attribute on it which I set. I'm a little concerned about whether this is the right way of doing this as the object in the registry is a thread-local one and the threads are being re-used for different http requests.

Upvotes: 1

Related Questions