これは、3 年以上前の別の質問と非常によく似ています: SQLAlchemy トランザクションを確認する一般的な方法として、認証されたユーザーなどを完全に表示するにはどうすればよいですか?
特定のテーブルへのすべての変更をログに記録したいアプリケーションに取り組んでいます。現在、バージョン管理を行う非常に優れた「レシピ」がありますが、変更が発生した日時と変更を行ったユーザーの ID を代わりに記録するように変更する必要があります。SQLAlchemy にパッケージ化されている history_meta.py の例を取り上げて、バージョン番号の代わりに時間を記録するようにしましたが、ユーザー ID を渡す方法がわかりません。
上記で参照した質問は、セッション オブジェクトにユーザー ID を含めることを提案しています。それは非常に理にかなっていますが、それを行う方法がわかりません。私は簡単なことを試しましsession.userid = authenticated_userid(request)たが、 history_meta.py では、その属性はセッションオブジェクトにもうないようです。
私は Pyramid フレームワークでこれらすべてを行っており、私が使用しているセッション オブジェクトは として定義されていDBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))ます。ビューで私はしsession = DBSession()、次に使用に進みますsession。(それが必要かどうかはよくわかりませんが、それが起こっていることです)
誰かが役に立つかもしれない場合に備えて、変更した history_meta.py を次に示します。
from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import mapper, class_mapper, attributes, object_mapper
from sqlalchemy.orm.exc import UnmappedClassError, UnmappedColumnError
from sqlalchemy import Table, Column, ForeignKeyConstraint, DateTime
from sqlalchemy import event
from sqlalchemy.orm.properties import RelationshipProperty
from datetime import datetime
def col_references_table(col, table):
    for fk in col.foreign_keys:
        if fk.references(table):
            return True
    return False
def _history_mapper(local_mapper):
    cls = local_mapper.class_
    # set the "active_history" flag
    # on on column-mapped attributes so that the old version
    # of the info is always loaded (currently sets it on all attributes)
    for prop in local_mapper.iterate_properties:
        getattr(local_mapper.class_, prop.key).impl.active_history = True
    super_mapper = local_mapper.inherits
    super_history_mapper = getattr(cls, '__history_mapper__', None)
    polymorphic_on = None
    super_fks = []
    if not super_mapper or local_mapper.local_table is not super_mapper.local_table:
        cols = []
        for column in local_mapper.local_table.c:
            if column.name == 'version_datetime':
                continue
            col = column.copy()
            col.unique = False
            if super_mapper and col_references_table(column, super_mapper.local_table):
                super_fks.append((col.key, list(super_history_mapper.local_table.primary_key)[0]))
            cols.append(col)
            if column is local_mapper.polymorphic_on:
                polymorphic_on = col
        if super_mapper:
            super_fks.append(('version_datetime', super_history_mapper.base_mapper.local_table.c.version_datetime))
            cols.append(Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=True))
        else:
            cols.append(Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=True))
        if super_fks:
            cols.append(ForeignKeyConstraint(*zip(*super_fks)))
        table = Table(local_mapper.local_table.name + '_history', local_mapper.local_table.metadata,
           *cols
        )
    else:
        # single table inheritance.  take any additional columns that may have
        # been added and add them to the history table.
        for column in local_mapper.local_table.c:
            if column.key not in super_history_mapper.local_table.c:
                col = column.copy()
                col.unique = False
                super_history_mapper.local_table.append_column(col)
        table = None
    if super_history_mapper:
        bases = (super_history_mapper.class_,)
    else:
        bases = local_mapper.base_mapper.class_.__bases__
    versioned_cls = type.__new__(type, "%sHistory" % cls.__name__, bases, {})
    m = mapper(
            versioned_cls,
            table,
            inherits=super_history_mapper,
            polymorphic_on=polymorphic_on,
            polymorphic_identity=local_mapper.polymorphic_identity
            )
    cls.__history_mapper__ = m
    if not super_history_mapper:
        local_mapper.local_table.append_column(
            Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=False)
        )
        local_mapper.add_property("version_datetime", local_mapper.local_table.c.version_datetime)
class Versioned(object):
    @declared_attr
    def __mapper_cls__(cls):
        def map(cls, *arg, **kw):
            mp = mapper(cls, *arg, **kw)
            _history_mapper(mp)
            return mp
        return map
def versioned_objects(iter):
    for obj in iter:
        if hasattr(obj, '__history_mapper__'):
            yield obj
def create_version(obj, session, deleted = False):
    obj_mapper = object_mapper(obj)
    history_mapper = obj.__history_mapper__
    history_cls = history_mapper.class_
    obj_state = attributes.instance_state(obj)
    attr = {}
    obj_changed = False
    for om, hm in zip(obj_mapper.iterate_to_root(), history_mapper.iterate_to_root()):
        if hm.single:
            continue
        for hist_col in hm.local_table.c:
            if hist_col.key == 'version_datetime':
                continue
            obj_col = om.local_table.c[hist_col.key]
            # get the value of the
            # attribute based on the MapperProperty related to the
            # mapped column.  this will allow usage of MapperProperties
            # that have a different keyname than that of the mapped column.
            try:
                prop = obj_mapper.get_property_by_column(obj_col)
            except UnmappedColumnError:
                # in the case of single table inheritance, there may be
                # columns on the mapped table intended for the subclass only.
                # the "unmapped" status of the subclass column on the
                # base class is a feature of the declarative module as of sqla 0.5.2.
                continue
            # expired object attributes and also deferred cols might not be in the
            # dict.  force it to load no matter what by using getattr().
            if prop.key not in obj_state.dict:
                getattr(obj, prop.key)
            a, u, d = attributes.get_history(obj, prop.key)
            if d:
                attr[hist_col.key] = d[0]
                obj_changed = True
            elif u:
                attr[hist_col.key] = u[0]
            else:
                # if the attribute had no value.
                attr[hist_col.key] = a[0]
                obj_changed = True
    if not obj_changed:
        # not changed, but we have relationships.  OK
        # check those too
        for prop in obj_mapper.iterate_properties:
            if isinstance(prop, RelationshipProperty) and \
                attributes.get_history(obj, prop.key).has_changes():
                obj_changed = True
                break
    if not obj_changed and not deleted:
        return
    attr['version_datetime'] = obj.version_datetime
    hist = history_cls()
    for key, value in attr.items():
        setattr(hist, key, value)
    session.add(hist)
    print(dir(session))
    obj.version_datetime = datetime.now()
def versioned_session(session):
    @event.listens_for(session, 'before_flush')
    def before_flush(session, flush_context, instances):
        for obj in versioned_objects(session.dirty):
            create_version(obj, session)
        for obj in versioned_objects(session.deleted):
            create_version(obj, session, deleted = True)
更新:
さて、before_flush() メソッドで取得sqlalchemy.orm.session.Sessionしたセッションは、アタッチしたセッションuser_idのタイプのようsqlalchemy.orm.scoping.scoped_sessionです。そのため、ある時点でオブジェクト レイヤーが取り除かれます。scoped_session 内のセッションに user_id を割り当てても安全ですか? 他のリクエストのためにそこにないことを確認できますか?