12

これは、3 年以上前の別の質問と非常によく似ています: SQLAlchemy トランザクションを確認する一般的な方法として、認証されたユーザーなどを完全に表示するにはどうすればよいですか?

特定のテーブルへのすべての変更をログに記録したいアプリケーションに取り組んでいます。現在、バージョン管理を行う非常に優れた「レシピ」がありますが、変更が発生した日時と変更を行ったユーザーの ID を代わりに記録するように変更する必要があります。SQLAlchemy にパッケージ化されている history_meta.py の例を取り上げて、バージョン番号の代わりに時間を記録するようにしましたが、ユーザー ID を渡す方法がわかりません。

上記で参照した質問は、セッション オブジェクトにユーザー ID を含めることを提案しています。それは非常に理にかなっていますが、それを行う方法がわかりません。私は簡単なことを試しましsession.userid = authenticated_userid(request)たが、 history_meta.py では、その属性はセッションオブジェクトにもうないようです。

私は Pyramid フレームワークでこれらすべてを行っており、私が使用しているセッション オブジェクトは として定義されていDBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))ます。ビューで私はしsession = DBSession()、次に使用に進みますsession。(それが必要かどうかはよくわかりませんが、それが起こっていることです)

誰かが役に立つかもしれない場合に備えて、変更した history_meta.py を次に示します。

from sqlalchemy.ext.declarative import declared_attr
from sqlalchemy.orm import mapper, class_mapper, attributes, object_mapper
from sqlalchemy.orm.exc import UnmappedClassError, UnmappedColumnError
from sqlalchemy import Table, Column, ForeignKeyConstraint, DateTime
from sqlalchemy import event
from sqlalchemy.orm.properties import RelationshipProperty
from datetime import datetime

def col_references_table(col, table):
    for fk in col.foreign_keys:
        if fk.references(table):
            return True
    return False

def _history_mapper(local_mapper):
    cls = local_mapper.class_

    # set the "active_history" flag
    # on on column-mapped attributes so that the old version
    # of the info is always loaded (currently sets it on all attributes)
    for prop in local_mapper.iterate_properties:
        getattr(local_mapper.class_, prop.key).impl.active_history = True

    super_mapper = local_mapper.inherits
    super_history_mapper = getattr(cls, '__history_mapper__', None)

    polymorphic_on = None
    super_fks = []
    if not super_mapper or local_mapper.local_table is not super_mapper.local_table:
        cols = []
        for column in local_mapper.local_table.c:
            if column.name == 'version_datetime':
                continue

            col = column.copy()
            col.unique = False

            if super_mapper and col_references_table(column, super_mapper.local_table):
                super_fks.append((col.key, list(super_history_mapper.local_table.primary_key)[0]))

            cols.append(col)

            if column is local_mapper.polymorphic_on:
                polymorphic_on = col

        if super_mapper:
            super_fks.append(('version_datetime', super_history_mapper.base_mapper.local_table.c.version_datetime))
            cols.append(Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=True))
        else:
            cols.append(Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=True))

        if super_fks:
            cols.append(ForeignKeyConstraint(*zip(*super_fks)))

        table = Table(local_mapper.local_table.name + '_history', local_mapper.local_table.metadata,
           *cols
        )
    else:
        # single table inheritance.  take any additional columns that may have
        # been added and add them to the history table.
        for column in local_mapper.local_table.c:
            if column.key not in super_history_mapper.local_table.c:
                col = column.copy()
                col.unique = False
                super_history_mapper.local_table.append_column(col)
        table = None

    if super_history_mapper:
        bases = (super_history_mapper.class_,)
    else:
        bases = local_mapper.base_mapper.class_.__bases__
    versioned_cls = type.__new__(type, "%sHistory" % cls.__name__, bases, {})

    m = mapper(
            versioned_cls,
            table,
            inherits=super_history_mapper,
            polymorphic_on=polymorphic_on,
            polymorphic_identity=local_mapper.polymorphic_identity
            )
    cls.__history_mapper__ = m

    if not super_history_mapper:
        local_mapper.local_table.append_column(
            Column('version_datetime', DateTime, default=datetime.now, nullable=False, primary_key=False)
        )
        local_mapper.add_property("version_datetime", local_mapper.local_table.c.version_datetime)


class Versioned(object):
    @declared_attr
    def __mapper_cls__(cls):
        def map(cls, *arg, **kw):
            mp = mapper(cls, *arg, **kw)
            _history_mapper(mp)
            return mp
        return map


def versioned_objects(iter):
    for obj in iter:
        if hasattr(obj, '__history_mapper__'):
            yield obj

def create_version(obj, session, deleted = False):
    obj_mapper = object_mapper(obj)
    history_mapper = obj.__history_mapper__
    history_cls = history_mapper.class_

    obj_state = attributes.instance_state(obj)

    attr = {}

    obj_changed = False

    for om, hm in zip(obj_mapper.iterate_to_root(), history_mapper.iterate_to_root()):
        if hm.single:
            continue

        for hist_col in hm.local_table.c:
            if hist_col.key == 'version_datetime':
                continue

            obj_col = om.local_table.c[hist_col.key]

            # get the value of the
            # attribute based on the MapperProperty related to the
            # mapped column.  this will allow usage of MapperProperties
            # that have a different keyname than that of the mapped column.
            try:
                prop = obj_mapper.get_property_by_column(obj_col)
            except UnmappedColumnError:
                # in the case of single table inheritance, there may be
                # columns on the mapped table intended for the subclass only.
                # the "unmapped" status of the subclass column on the
                # base class is a feature of the declarative module as of sqla 0.5.2.
                continue

            # expired object attributes and also deferred cols might not be in the
            # dict.  force it to load no matter what by using getattr().
            if prop.key not in obj_state.dict:
                getattr(obj, prop.key)

            a, u, d = attributes.get_history(obj, prop.key)

            if d:
                attr[hist_col.key] = d[0]
                obj_changed = True
            elif u:
                attr[hist_col.key] = u[0]
            else:
                # if the attribute had no value.
                attr[hist_col.key] = a[0]
                obj_changed = True

    if not obj_changed:
        # not changed, but we have relationships.  OK
        # check those too
        for prop in obj_mapper.iterate_properties:
            if isinstance(prop, RelationshipProperty) and \
                attributes.get_history(obj, prop.key).has_changes():
                obj_changed = True
                break

    if not obj_changed and not deleted:
        return

    attr['version_datetime'] = obj.version_datetime
    hist = history_cls()
    for key, value in attr.items():
        setattr(hist, key, value)
    session.add(hist)
    print(dir(session))
    obj.version_datetime = datetime.now()

def versioned_session(session):
    @event.listens_for(session, 'before_flush')
    def before_flush(session, flush_context, instances):
        for obj in versioned_objects(session.dirty):
            create_version(obj, session)
        for obj in versioned_objects(session.deleted):
            create_version(obj, session, deleted = True)

更新: さて、before_flush() メソッドで取得sqlalchemy.orm.session.Sessionしたセッションは、アタッチしたセッションuser_idのタイプのようsqlalchemy.orm.scoping.scoped_sessionです。そのため、ある時点でオブジェクト レイヤーが取り除かれます。scoped_session 内のセッションに user_id を割り当てても安全ですか? 他のリクエストのためにそこにないことを確認できますか?

4

2 に答える 2

0

いろいろいじった後、次のようにして、scoped_session 内のセッション オブジェクトに値を設定できるようです。

DBSession = scoped_session(sessionmaker(extension=ZopeTransactionExtension()))
session = DBSession()
inner_session = session.registry()
inner_session.user_id = "test"
versioned_session(session)

sessionhistory_meta.py で渡されるオブジェクトには、私user_idが設定した属性があります。レジストリ内のオブジェクトはスレッドローカルであり、スレッドはさまざまな http 要求に再利用されているため、これが正しい方法であるかどうかについて少し心配しています。

于 2013-04-11T16:54:04.833 に答える