6

処理が必要なため、マルチスレッド方式で生成したい数万 (場合によっては数百) の永続オブジェクトがあります。

オブジェクトの作成は別のスレッドで行われますが (範囲指定されたセッションで Flask-SQLAlchemy 拡張機能を使用)、生成されたオブジェクトを DB に書き込むための呼び出しは、生成が完了した後に 1 か所で行われます。

問題は、作成されるオブジェクトがいくつかの既存の関係の一部であることです。そのため、どのスレッドにも明示的なセッションがない個別の並行スレッドで作成されたにもかかわらず、ID マップへの自動追加がトリガーされます。

生成されたオブジェクトを 1 つのリストに含め、リスト全体を (1 つのセッション オブジェクトを使用して) データベースに書き込むことを望んでいました。これにより、次のようなエラーが発生します。

AssertionError: A conflicting state is already present in the identity map for key (<class 'app.ModelObject'>, (1L,))

したがって、同時実行コードの外部でグローバルセッションを使用して追加およびコミットしようとすると、アサーションエラーがトリガーされるため、アイデンティティマップが既に入力されていると私が考える理由です。

最後の詳細は、セッションオブジェクトが何であれ(マルチスレッドの場合にアイデンティティマップへの自動追加がどのように機能するかを完全に理解していないため、スコープまたはそれ以外)、方法を見つけることができない/方法がわからないということですそれらへの参照を取得して、プロセスごとに個別のセッションを処理したい場合でもできるようにします。

どんなアドバイスでも大歓迎です。私が (まだ) コードを投稿していない唯一の理由は、実際の例をアプリからすぐに抽出するのが難しいからです。誰かが本当にそれを見る必要がある場合は投稿します。

4

2 に答える 2

5

各セッションはスレッドローカルです。つまり、スレッドごとに個別のセッションがあります。一部のインスタンスを別のスレッドに渡すことにした場合、それらはセッションから「分離」されます。受信スレッドで使用db.session.add_all(objects)して、それらをすべて元に戻します。

何らかの理由で、異なるスレッドで同じ ID (主キー列) を持つオブジェクトを作成し、両方をデータベースに送信しようとしているようです。1 つのオプションは、ID が一意であることが保証されるように、これが発生する理由を修正することです。マージを試すこともできます。merged_object = db.session.merge(other_object, load=False).

編集:zzzeekのコメントは、起こっている可能性のある他の何かについて私に手がかりを与えました:

Flask-SQLAlchemy を使用すると、セッションはアプリ コンテキストに関連付けられます。これはスレッド ローカルであるため、新しいスレッドを生成するとコンテキストが無効になります。スレッドにデータベース セッションはありません。すべてのインスタンスが切り離されており、関係を適切に追跡できません。1 つの解決策は、各スレッドに渡して、ブロックapp内ですべてを実行することです。with app.app_context():ブロック内で、最初に を使用db.session.addして、渡されたインスタンスをローカル セッションに取り込みます。一貫性を確保するために、後でマスター タスクをマージする必要があります。

于 2013-06-26T13:15:17.300 に答える
3

誰かがこの問題を抱えている/将来これをやりたい場合に備えて、疑似コードを使用して問題と解決策を明確にしたいだけです。

class ObjA(object):
    obj_c = relationship('ObjC', backref='obj_c')

class ObjB(object):
    obj_c = relationship('ObjC', backref='obj_c')

class ObjC(object):
    obj_a_id = Column(Integer, ForeignKey('obj_a.id'))
    obj_b_id = Column(Integer, ForeignKey('obj_b.id'))

    def __init__(self, obj_a, obj_b):
        self.obj_a = obj_a
        self.obj_b = obj_b


def make_a_bunch_of_c(obj_a, list_of_b=None):
    return [ObjC(obj_a, obj_b) for obj_b in list_of_b]

def parallel_generate():
   list_of_a = session.query(ObjA).all() # assume there are 1000 of these
   list_of_b = session.query(ObjB).all() # and 30 of these

   fxn = functools.partial(make_a_bunch_of_c, list_of_b=list_of_b)
   pool = multiprocessing.Pool(10)
   all_the_things = pool.map(fxn, list_of_a)
   return all_the_things

ここでちょっと止めましょう。元の問題は、ObjC のリストを追加しようとすると、元の質問にエラー メッセージが表示されることでした。

session.add_all(all_the_things)

AssertionError: A conflicting state is already present in the identity map for key [...]

注: エラーは追加フェーズで発生します。アサーションはコミット前に発生するため、コミットの試行は発生しません。私が知る限り。

解決:

all_the_things = parallel_generate()
for thing in all_the_things:
    session.merge(thing)
session.commit()

自動的に追加されたオブジェクト (リレーションシップ カスケード経由) を処理する際のセッション使用率の詳細は、まだわかりません。競合が最初に発生した理由を説明することはできません。私が知っているのは、マージ関数を使用すると、SQLAlchemy が 10 の異なるプロセスで作成されたすべての子オブジェクトをマスター プロセスの 1 つのセッションに並べ替えるということだけです。

誰かがこれに遭遇した場合、私はその理由に興味があります。

于 2013-06-28T04:33:22.763 に答える