3

sqlalchemy を使用してネットワーク経由でデータベースを更新するタスクが与えられました。Python のスレッド化モジュールを使用することにしました。現在、私は 1 つのスレッド (別名プロデューサー スレッド) を使用して、キューを介して他のスレッドに作業単位を消費するように指示しています。

プロデューサー スレッドは次のような処理を行います。

  def produce(self, last_id):
    unit = session.query(Request).order_by(Request.id) \
        .filter(Request.item_id == None).yield_per(50)
    self.queue.put(unit, True, Master.THREAD_TIMEOUT)     

一方、コンシューマー スレッドは次のような処理を行います。

  def consume(self):
    unit = self.queue.get()
    request = unit
    item = Item.get_item_by_url(request)
    request.item = item
    session.add(request)
    session.flush()

そして、私はsqlalchemyのスコープセッションを使用しています:

session = scoped_session(sessionmaker(autocommit=True, autoflush=True, bind=engine))

ただし、例外が発生しています。

"sqlalchemy.exc.InvalidRequestError: Object FOO is already attached to session '1234' (this is '5678')"

この例外は、リクエスト オブジェクトが 1 つのセッション (プロデューサー セッション) で作成され、コンシューマーが別のスレッドに属しているため、別のスコープ セッションを使用しているという事実に起因することを理解しています。

私の回避策は、コンシューマーがリクエストオブジェクトを取得するために以下のコードを呼び出す必要がある間、プロデューサースレッドが request.id をキューに渡すようにすることです。

request = session.query(Request).filter(Request.id == request_id).first()

これには別のネットワーク呼び出しが含まれており、明らかに最適ではないため、このソリューションは好きではありません。

  1. プロデューサーの db 呼び出しの結果を無駄にしない方法はありますか?
  2. 複数のIDがワークユニットとしてキューに渡されるように「プロデュース」を書く方法はありますか?

フィードバックを歓迎します!

4

1 に答える 1

4

Requestインスタンスをキューに入れる前にメイン スレッド セッションからデタッチする必要があります。次に、キューから再び取り出されたときに、キュー処理スレッド セッションにインスタンスをアタッチします。

デタッチする.expunge()には、セッションを呼び出して、リクエストを渡します。

session.expunge(unit)

その後、キュー スレッドで処理するときに、マージして再接続します。フラグを False に設定しloadて、データベースへのラウンドトリップが再び発生しないようにします。

session.merge(request, load=False)
于 2012-09-08T07:49:29.200 に答える