Pythonでマルチプロセッシング優先キューを実装する方法に精通している人はいますか?
6 に答える
悲しいかな、古き良きキューイング規律を変更するほど簡単ではありませんQueue.Queue
: 後者は実際にはテンプレートメソッドパターンに従ってサブクラス化されるように設計されており、フックメソッドだけをオーバーライドしたり、キューイング規律を簡単に変更し_put
たり_get
できます ( 2.6 の明示的な LIFO と優先度の実装が提供されていますが、以前のバージョンの Python でも簡単に作成できました)。
マルチプロセッシングの場合、一般的なケース (複数のリーダー、複数のライター) では、キューの分散型の性質をあきらめる以外に、優先度キューを実装する方法がわかりません。キューを処理するだけの特別な補助プロセスを 1 つ指定し、(本質的に) RPC をそれに送信して、指定された規則でキューを作成し、キューに put と gets を実行し、それに関する情報を取得します。そのため、すべてのプロセスが aux proc の場所 (ホストとポートなど) などを認識していることを確認するという通常の問題が発生します (プロセスが常にメイン proc によって起動時に生成される場合は簡単です)。非常に大きな問題は、特に良いパフォーマンスでそれを実行したい場合、aux proc のクラッシュから保護します (スレーブ プロセスへのデータのレプリケーションが必要で、マスターがクラッシュした場合にスレーブ間で「マスター選択」を分散する必要がある、など)。など。ゼロからそれを行うことは、博士号を取得する価値のある仕事のように思えます。から始めるかもしれませんジョンソンの仕事、またはActiveMQのような非常に一般的なアプローチに便乗します。
いくつかの特殊なケース (例: リーダーが 1 つ、ライターが 1 つ) の方が簡単で、アプリケーションの限られた領域では高速であることが判明する場合があります。ただし、その限られた領域に対して非常に具体的に制限された仕様を作成する必要があります。その結果は、(汎用の)「マルチプロセッシング キュー」を構成するのではなく、特定の制約された一連の要件にのみ適用されるものになります。
真の FIFO を妨げるバグがあります。ここ
を読んでください。
プライオリティ キューのマルチプロセッシング セットアップを構築する別の方法は、確かに素晴らしいでしょう!
これは答えではありませんが、マルチプロセッシング キューの開発に役立つ可能性があります。
Python のArrayを使用して作成した単純なプライオリティ キュークラスを次に示します。
class PriorityQueue():
"""A basic priority queue that dequeues items with the smallest priority number."""
def __init__(self):
"""Initializes the queue with no items in it."""
self.array = []
self.count = 0
def enqueue(self, item, priority):
"""Adds an item to the queue."""
self.array.append([item, priority])
self.count += 1
def dequeue(self):
"""Removes the highest priority item (smallest priority number) from the queue."""
max = -1
dq = 0
if(self.count > 0):
self.count -= 1
for i in range(len(self.array)):
if self.array[i][1] != None and self.array[i][1] > max:
max = self.array[i][1]
if max == -1:
return self.array.pop(0)
else:
for i in range(len(self.array)):
if self.array[i][1] != None and self.array[i][1] <= max:
max = self.array[i][1]
dq = i
return self.array.pop(dq)
def requeue(self, item, newPrio):
"""Changes specified item's priority."""
for i in range(len(self.array)):
if self.array[i][0] == item:
self.array[i][1] = newPrio
break
def returnArray(self):
"""Returns array representation of the queue."""
return self.array
def __len__(self):
"""Returnes the length of the queue."""
return self.count
私は同じユースケースを持っていました。ただし、優先順位の数には限りがあります。
私が最終的にやっていることは、優先度ごとに 1 つのキューを作成することです。私の Process ワーカーは、最も重要なキューから重要度の低いキューへと、それらのキューからアイテムを取得しようとします (1 つのキューから別のキューへの移動が完了すると、キューは空です)
要件に応じて、オペレーティング システムとファイル システムをさまざまな方法で使用できます。キューはどのくらい大きくなり、どのくらいの速さである必要がありますか? キューが大きくても、キューへのアクセスごとにいくつかのファイルを開く場合は、BTree 実装を使用してキューを格納し、ファイルをロックして排他的アクセスを強制することができます。ゆっくりだが頑丈。
キューが比較的小さいままで、高速にする必要がある場合は、一部のオペレーティング システムで共有メモリを使用できる可能性があります...
キューが小さく (数千のエントリ)、非常に高速である必要がない場合は、ファイルをロックするデータを含むファイルを含むディレクトリのような単純なものを使用できます。小さくて遅いのであれば、これが私の好みです。
キューが大きくなる可能性があり、平均して高速にしたい場合は、Alex が提案するような専用サーバー プロセスを使用する必要があります。しかし、これは首の痛みです。
パフォーマンスとサイズの要件は何ですか?