2

Python Queue cclass を使用して、複数のワーカー スレッド間で共有されるタスクのリストを管理しています。実際のコードは巨大で、私はまだバグを完全になくす作業を行っています。時々、ワーカー スレッドがクラッシュし、ルーチン全体を再起動する必要があります。その過程で、キューに入れられたすべてのタスクを失います。プロセスを再起動するたびにタスクリストがそのファイルからプリロードされるように、キューをファイルに保存する方法はありますか?

最初の考えでは、タスクを取得またはキューに入れるとき、ファイルの読み取りと書き込みを同時に行う必要があるようです。ただし、これでは queue.task_done() の機能が得られず、最適化されたソリューションではない可能性があります。どんなアイデアでも大歓迎です。

4

5 に答える 5

5

単純にキューをピクルすることを検討しましたか?

于 2011-08-04T16:20:44.753 に答える
1

これには、モジュールを含む複数のアプローチがありpickleます...

しかし、私の意見では、ファイルに行ごとに、保存したい他のプロパティを含む列のキューの各要素を書き込むだけで簡単ですtask_done.

例:

element1, True
element2, False
...

Python では、次のような形式のファイルを読むのは非常に簡単です。

for line in file('path/file.ext'):
    name, state = line.split(sep_char)
    #and them insert into the queue...
于 2011-08-04T16:44:29.573 に答える
0

ワーカーとマスターの間のハンドシェーク メカニズムを実装します。

マスターにはタスクのリストがあり、それらをキューに入れる前に、リストをファイルにピクルします。次に、タスクをキューに挿入します。作業が完了すると、ワーカーは ACK メッセージを送り返します。その時点でのみ、タスク リストを unpickle し、対応する ID を削除します。

于 2011-09-06T21:49:02.550 に答える
0

私が提供できる簡単なオプションは、データベース テーブルをクラスでラップし、それをキューとして使用することです。これには、自動インクリメント列が驚くほどうまく機能します (次に削除する項目は、ID が最も小さい項目です)。

class dbQueue:
  init():
    # Pick some random id for this run (or set it to some thing you know).
  put():
    # Insert entry into table
  get():
    # The update .. select combo removes the need for a database that has transactions.
    # If no entries bear your ID:
      # Update the next entry that is not already marked with your ID.
    # Select the entry that matches your ID and return it.
  task_done():
    # Delete the entry with your ID.

これは、キューが更新される頻度によっては最高のパフォーマンスを発揮しません。メモリ内の sqlite データベースでさえ、リンクされたリスト構造ほど高速ではありません。反対に、データベースにアクセスできる任意のツールを使用してデータベースを表示できるため、進行中のツールを確認できます。

于 2011-09-06T21:41:46.767 に答える
0

これを行う簡単な方法は、メッセージ キューに AMQP を使用し、メッセージ ブローカーにメッセージを処理させることです。永続的な永続キューを持つメッセージ ブローカーとして RabbitMQ を使用して、同様のシステムを実装しました。RAM が 512M しかなく、再生中のメッセージが 100 万程度の仮想 Linux サーバーで古いバージョンの 1.72 サーバーを使用していたときに、RabbitMQ サーバー ソフトウェアがクラッシュしても、メッセージは生き残っています。

私が行う方法は、各タイプのワーカーが異なるキューからメッセージを消費することです。そのタイプのワーカーが複数必要な場合、メッセージ キューは自動的にラウンド ロビンになり、ワーカーがメッセージの処理を完了できない場合、ワーカーは確認応答せずにキューに戻ります。

約 80 行のコードを含む小さな shim モジュールを の前kombuに書き、後で を使用するように書き直しましたpy-amqplibhaighaAMQP 仕様ドキュメントと非常によく一致するため、以前に知っていればそれを使用していたでしょう。

kombu はデバッグが非常に複雑であり、AMQP 標準から奇妙な方法で逸脱しているため、お勧めしません。ドキュメンテーションはPyPihaighaの 1 つのサンプル コード フラグメントにすぎませんが、AMQP 仕様をハイガ ドキュメントとして使用できるため、kombu や amqplib よりも適切にドキュメンテーションされています。

于 2011-08-10T04:57:50.493 に答える