15

パフォーマンス上の理由から、非正規化されたデータベースがあり、一部のテーブルには、他のテーブルの多くの行から集約されたデータが含まれています。SQLAlchemyイベントを使用して、この非正規化されたデータキャッシュを維持したいと思います。例として、私がフォーラムソフトウェアを作成Threadしていて、その情報を効率的に表示するために、スレッド内のすべてのコメントの合計単語数を追跡する列をそれぞれに持たせたいとします。

class Thread(Base):
    id = Column(UUID, primary_key=True, default=uuid.uuid4)
    title = Column(UnicodeText(), nullable=False)
    word_count = Column(Integer, nullable=False, default=0)

class Comment(Base):
    id = Column(UUID, primary_key=True, default=uuid.uuid4)
    thread_id = Column(UUID, ForeignKey('thread.id', ondelete='CASCADE'), nullable=False)
    thread = relationship('Thread', backref='comments')
    message = Column(UnicodeText(), nullable=False)
    
    @property
    def word_count(self):
        return len(self.message.split())

したがって、コメントが挿入されるたびに(簡単にするために、コメントが編集または削除されないようにしましょう)、word_count関連付けられたオブジェクトの属性を更新する必要がありThreadます。だから私は次のようなことをしたいです

def after_insert(mapper, connection, target):
    thread = target.thread
    thread.word_count = sum(c.word_count for c in thread.comments)
    print("updated cached word count to", thread.word_count)

event.listen(Comment, "after_insert", after_insert)

したがって、を挿入するCommentと、イベントが発生し、単語数が正しく計算されていることがわかりますが、その変更はThreadデータベースの行に保存されません。after_insertのドキュメントには、更新された他のテーブルに関する警告はありませんが、after_deleteなど、他のいくつかのテーブルにはいくつかの警告があります。

では、SQLAlchemyイベントでこれを行うためのサポートされている方法はありますか?私はすでにSQLAlchemyイベントを他の多くの目的で使用しているので、データベーストリガーを作成する代わりに、すべてをそのように実行したいと思います。

4

2 に答える 2

49

after_insert() イベントはこれを行う 1 つの方法であり、他のフラッシュ関連イベントの場合のように、SQLAlchemyConnectionオブジェクトではなく、SQLAlchemy オブジェクトが渡されることに気付くかもしれません。Sessionマッパー レベルのフラッシュ イベントは、通常、指定された で直接 SQL を呼び出すために使用することを目的としていますConnection

@event.listens_for(Comment, "after_insert")
def after_insert(mapper, connection, target):
    thread_table = Thread.__table__
    thread = target.thread
    connection.execute(
            thread_table.update().
             where(thread_table.c.id==thread.id).
             values(word_count=sum(c.word_count for c in thread.comments))
    )
    print "updated cached word count to", thread.word_count

ここで注目すべき点は、UPDATE ステートメントを直接呼び出す方が、作業単位プロセス全体を通してその属性変更を再度実行するよりもはるかにパフォーマンスが高いということです。

ただし、フラッシュが発生する前に「word_count」の値がわかっているため、ここでは after_insert() のようなイベントは実際には必要ありません。Comment オブジェクトと Thread オブジェクトは互いに関連付けられているため、実際にはそれを知っています。また、属性イベントを使用して、Thread.word_count を常にメモリ内で完全に最新の状態に保つこともできます。

def _word_count(msg):
    return len(msg.split())

@event.listens_for(Comment.message, "set")
def set(target, value, oldvalue, initiator):
    if target.thread is not None:
        target.thread.word_count += (_word_count(value) - _word_count(oldvalue))

@event.listens_for(Comment.thread, "set")
def set(target, value, oldvalue, initiator):
    # the new Thread, if any
    if value is not None:
        value.word_count += _word_count(target.message)

    # the old Thread, if any
    if oldvalue is not None:
        oldvalue.word_count -= _word_count(target.message)

このメソッドの大きな利点は、thread.comments を反復処理する必要がないことです。これは、アンロードされたコレクションの場合、別の SELECT が発行されることを意味します。

さらに別の方法は、before_flush() で行うことです。以下は簡単で汚いバージョンです。word_count を更新する必要があるかどうかを判断するために、何が変更されたかをより注意深く分析するために改良できます。

@event.listens_for(Session, "before_flush")
def before_flush(session, flush_context, instances):
    for obj in session.new | session.dirty:
        if isinstance(obj, Thread):
            obj.word_count = sum(c.word_count for c in obj.comments)
        elif isinstance(obj, Comment):
            obj.thread.word_count = sum(c.word_count for c in obj.comments)

最もパフォーマンスが高く、最新の属性イベント メソッドを使用します。

于 2012-12-07T15:15:38.673 に答える