a1.context は、各要素が一意のキーを持つコレクションである必要があるため、関係に直接アソシエーション プロキシを使用してこれを行うことはできません。コレクションはディクショナリにすることができますが、a1 は同じキーを持つ多くの Context オブジェクトを持つことができるため、ここにはそのようなコレクションはありません。オブジェクトのコレクションを各メンバー オブジェクトの属性のコレクションに減らす Assoc prox の単純な方法は、これには適用されません。
したがって、これが本当に必要であり、構造を変更できない場合は、関連付けプロキシが行うことを、ハードコードされた方法で行うだけです。つまり、プロキシ コレクションを構築します。実際には2つだと思います。大したことではありません。クランクを回すだけです....かなりの量です。すべての操作のテストをここに追加してください。
from sqlalchemy import *
from sqlalchemy.orm import *
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.ext.associationproxy import association_proxy
import itertools
Base= declarative_base()
class A(Base):
__tablename__ = "a"
id = Column(Integer, primary_key=True)
# other attributes
value = Column(Integer)
def __init__(self, value):
self.value = value
@property
def context(self):
return HolderBySetDict(self)
@context.setter
def context(self, dict_):
toremove = set([ctx for ctx in self.attached_by if ctx.key not in dict_])
toadd = set([Context(key=k, holder=item) for k, v in dict_.items()
for item in itertools.chain(v)])
self.attached_by.update(toadd)
self.attached_by.difference_update(toremove)
class HolderBySetDict(object):
def __init__(self, parent):
self.parent = parent
def __iter__(self):
return iter(self.keys())
def keys(self):
return list(set(ctx.key for ctx in self.parent.attached_by))
def __delitem__(self, key):
toremove = set([ctx for ctx in self.parent.attached_by if ctx.key == key])
self.parent.attached_by.difference_update(toremove)
def __getitem__(self, key):
return HolderBySet(self.parent, key)
def __setitem__(self, key, value):
current = set([ctx for ctx in self.parent.attached_by if ctx.key == key])
toremove = set([ctx for ctx in current if ctx.holder not in value])
toadd = set([Context(key=key,holder=v) for v in value if v not in current])
self.parent.attached_by.update(toadd)
self.parent.attached_by.difference_update(toremove)
# exercises ! for the reader !
#def __contains__(self, key):
#def values(self):
#def items(self):
# ....
class HolderBySet(object):
def __init__(self, parent, key):
self.key = key
self.parent = parent
def __iter__(self):
return iter([ctx.holder for ctx
in self.parent.attached_by if ctx.key == self.key])
def update(self, items):
curr = set([ctx.holder for ctx
in self.parent.attached_by if ctx.key==self.key])
toadd = set(items).difference(curr)
self.parent.attached_by.update(
[Context(key=self.key, holder=item) for item in toadd])
def remove(self, item):
for ctx in self.parent.attached_by:
if ctx.key == self.key and ctx.holder is item:
self.parent.attached_by.remove(ctx)
break
else:
raise ValueError("Value not present")
def add(self, item):
for ctx in self.parent.attached_by:
if ctx.key == self.key and ctx.holder is item:
break
else:
self.parent.attached_by.add(Context(key=self.key, holder=item))
# more exercises ! for the reader !
#def __contains__(self, key):
#def union(self):
#def intersection(self):
#def difference(self):
#def difference_update(self):
# ....
class Context(Base):
__tablename__ = "context"
holder_id = Column(Integer, ForeignKey("a.id"), nullable=False, primary_key=True)
attachment_id = Column(Integer, ForeignKey("a.id"), nullable=False, primary_key=True)
key = Column(String, primary_key=True)
holder = relationship(A,
primaryjoin=lambda: Context.holder_id==A.id)
attachment = relationship(A,
primaryjoin=lambda: Context.attachment_id==A.id,
backref=backref("attached_by", collection_class=set))
a1 = A(1)
a2 = A(2)
a3, a4, a5 = A(3), A(4), A(5)
a1.context["key_1"] = set([a2])
a1.context["key_2"] = set([a3, a4, a5])
assert set([ctx.holder for ctx in a1.attached_by if ctx.key == "key_1"]) == set([a2])
assert set([ctx.holder for ctx in a1.attached_by if ctx.key == "key_2"]) == set([a3, a4, a5])
a10 = A(10)
a1.context["key_1"].add(a10)
print set([ctx.holder for ctx in a1.attached_by if ctx.key == "key_1"])
assert set([ctx.holder for ctx in a1.attached_by if ctx.key == "key_1"]) == set([a2, a10])
a100 = A(100)
a101 = A(101)
a100.context = {
"key_1": set([a101])
}
assert set([ctx.holder for ctx in a100.attached_by]) == set([a101])