3

私の問題は次のとおりです。

私は2つのモデルを持っておりEntryTagSQLAlchemyの多対多の関係によってリンクされています。を削除した後、Tag対応するものがないものをすべて削除したいと思います。EntryEntry

私が欲しいものを説明する例:

  • Entry 1タグ付きpythonjava

  • Entry 2タグ付きpythonc++

これらの2つのエントリを使用すると、データベースにはタグ、、、pythonおよびjavaが含まれますc++。ここで削除する場合は、SQLAlchemyでデータベースからタグEntry 2を自動的に削除します。c++モデル自体でこの動作を定義することは可能Entryですか、それとももっとエレガントな方法がありますか?

ありがとう。

4

2 に答える 2

3

この質問はしばらく前にここで尋ねられました:SQLAlchemy関係にdelete-orphanを設定すると、AssertionErrorが発生します:このAttributeImplは親を追跡するように構成されていません

これは「多対多の孤立」の問題です。jadkik94は、これをキャッチするためにイベントを使用する必要があるという点で近いですが、この場合は機能しますが、マッパーイベント内でSessionを使用しないことをお勧めします。

以下では、他のSOの質問から逐語的に答えを取り出し、「役割」という単語を「エントリ」に置き換えます。

from sqlalchemy.ext.associationproxy import association_proxy
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import event
from sqlalchemy.orm import attributes

Base= declarative_base()

tagging = Table('tagging',Base.metadata,
    Column('tag_id', Integer, ForeignKey('tag.id', ondelete='cascade'), primary_key=True),
    Column('entry_id', Integer, ForeignKey('entry.id', ondelete='cascade'), primary_key=True)
)

class Tag(Base):

    __tablename__ = 'tag'
    id = Column(Integer, primary_key=True)
    name = Column(String(100), unique=True, nullable=False)

    def __init__(self, name=None):
        self.name = name

class Entry(Base):
    __tablename__ = 'entry'

    id = Column(Integer, primary_key=True)
    tag_names = association_proxy('tags', 'name')

    tags = relationship('Tag',
                        secondary=tagging,
                        backref='entries')

@event.listens_for(Session, 'after_flush')
def delete_tag_orphans(session, ctx):
    # optional: look through Session state to see if we want
    # to emit a DELETE for orphan Tags
    flag = False

    for instance in session.dirty:
        if isinstance(instance, Entry) and \
            attributes.get_history(instance, 'tags').deleted:
            flag = True
            break
    for instance in session.deleted:
        if isinstance(instance, Entry):
            flag = True
            break

    # emit a DELETE for all orphan Tags.   This is safe to emit
    # regardless of "flag", if a less verbose approach is
    # desired.
    if flag:
        session.query(Tag).\
            filter(~Tag.entries.any()).\
            delete(synchronize_session=False)


e = create_engine("sqlite://", echo=True)

Base.metadata.create_all(e)

s = Session(e)

r1 = Entry()
r2 = Entry()
r3 = Entry()
t1, t2, t3, t4 = Tag("t1"), Tag("t2"), Tag("t3"), Tag("t4")

r1.tags.extend([t1, t2])
r2.tags.extend([t2, t3])
r3.tags.extend([t4])
s.add_all([r1, r2, r3])

assert s.query(Tag).count() == 4

r2.tags.remove(t2)

assert s.query(Tag).count() == 4

r1.tags.remove(t2)

assert s.query(Tag).count() == 3

r1.tags.remove(t1)

assert s.query(Tag).count() == 2

2つのほぼ同一のSO質問は、これを手元にあるものとして認定するため、http: //www.sqlalchemy.org/trac/wiki/UsageRecipes/ManyToManyOrphanのwikiに追加しました。

于 2012-10-04T16:01:31.543 に答える
1

私はコードに私のために話させます:

from sqlalchemy import create_engine, exc, event
from sqlalchemy.orm import sessionmaker
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import func, Table, Column, Integer, String, Float, Boolean, MetaData, ForeignKey
from sqlalchemy.orm import relationship, backref

# Connection
engine = create_engine('sqlite:///', echo=True)
Base = declarative_base(bind=engine)
Session = sessionmaker(bind=engine)

# Models
entry_tag_link = Table('entry_tag', Base.metadata,
    Column('entry_id', Integer, ForeignKey('entries.id')),
    Column('tag_id', Integer, ForeignKey('tags.id'))
)

class Entry(Base):
    __tablename__ = 'entries'
    id = Column(Integer, primary_key=True)
    name = Column(String(255), nullable=False, default='')

    tags = relationship("Tag", secondary=entry_tag_link, backref="entries")

    def __repr__(self):
        return '<Entry %s>' % (self.name,)

class Tag(Base):
    __tablename__ = 'tags'
    id = Column(Integer, primary_key=True)
    name = Column(String(255), nullable=False)

    def __repr__(self):
        return '<Tag %s>' % (self.name,)

# Delete listener
def delete_listener(mapper, connection, target):
    print "---- DELETING %s ----" % (target,)
    print '-' * 20
    for t in target.tags:
        if len(t.entries) == 0:
            print ' ' * 5, t, 'is to be deleted'
            session.delete(t)
    print '-' * 20

event.listen(Entry, 'before_delete', delete_listener)

# Utility functions
def dump(session):
    entries = session.query(Entry).all()
    tags = session.query(Tag).all()

    print '*' * 20
    print 'Entries', entries
    print 'Tags', tags
    print '*' * 20


Base.metadata.create_all()

session = Session()

t1, t2, t3 = Tag(name='python'), Tag(name='java'), Tag(name='c++')

e1, e2 = Entry(name='Entry 1', tags=[t1, t2]), Entry(name='Entry 2', tags=[t1, t3])

session.add_all([e1,e2])
session.commit()

dump(session)

raw_input("---- Press return to delete the second entry and see the result ----")

session.delete(e2)
session.commit()

dump(session)

上記のコードは、SQLAlchemyORMイベントのafter_deleteイベントを使用しています。この行は魔法を行います:

event.listen(Entry, 'before_delete', delete_listener)

これは、アイテムに対するすべての削除をリッスンしEntry、リスナーを呼び出して、必要な処理を実行することを意味します。ただし、ドキュメントでは、イベント内のセッションを変更することは推奨されていません(追加したリンクの警告を参照してください)。しかし、私が見る限り、それは機能するので、これがあなたのために機能するかどうかを確認するのはあなた次第です。

于 2012-09-29T18:01:57.083 に答える