sqlalchemy を使用してネットワーク経由でデータベースを更新するタスクが与えられました。Python のスレッド化モジュールを使用することにしました。現在、私は 1 つのスレッド (別名プロデューサー スレッド) を使用して、キューを介して他のスレッドに作業単位を消費するように指示しています。
プロデューサー スレッドは次のような処理を行います。
def produce(self, last_id):
unit = session.query(Request).order_by(Request.id) \
.filter(Request.item_id == None).yield_per(50)
self.queue.put(unit, True, Master.THREAD_TIMEOUT)
一方、コンシューマー スレッドは次のような処理を行います。
def consume(self):
unit = self.queue.get()
request = unit
item = Item.get_item_by_url(request)
request.item = item
session.add(request)
session.flush()
そして、私はsqlalchemyのスコープセッションを使用しています:
session = scoped_session(sessionmaker(autocommit=True, autoflush=True, bind=engine))
ただし、例外が発生しています。
"sqlalchemy.exc.InvalidRequestError: Object FOO is already attached to session '1234' (this is '5678')"
この例外は、リクエスト オブジェクトが 1 つのセッション (プロデューサー セッション) で作成され、コンシューマーが別のスレッドに属しているため、別のスコープ セッションを使用しているという事実に起因することを理解しています。
私の回避策は、コンシューマーがリクエストオブジェクトを取得するために以下のコードを呼び出す必要がある間、プロデューサースレッドが request.id をキューに渡すようにすることです。
request = session.query(Request).filter(Request.id == request_id).first()
これには別のネットワーク呼び出しが含まれており、明らかに最適ではないため、このソリューションは好きではありません。
- プロデューサーの db 呼び出しの結果を無駄にしない方法はありますか?
- 複数のIDがワークユニットとしてキューに渡されるように「プロデュース」を書く方法はありますか?
フィードバックを歓迎します!