3

だから私はアイテムのリストを取り、それを与えられた数のリスト(たとえば10)に分割し、それらの10個のリストを取り、10個のスレッド「EvaluationThreads」(threading.threadを拡張する)を生成するツールを作成しました。それらのスレッドのうち、評価するために提供されたものは何でも評価します。各スレッドを開始するとき、それらをすべてリストに入れ、それらを生成した後、次のコードがあります。

for th in threadList:
    th.join()
    someTotal = th.resultsAttribute

そして、それが私がすべてのスレッドが終了するのを待ってそれらの情報を収集する方法です。これはすべてが終了するのを待ってから結果を収集するための作業方法ですが、これらのスレッドはさまざまな時間に非常にうまく終了する可能性があり、最初に開始したスレッドがすべて終了する場合は、よりエレガントな方法が必要だと思います。以前に終了したものは、結合する前にそのスレッドが終了するのを待つ必要があります。これらのスレッドの情報を取得し、開始された順序ではなく、終了時に結合する方法はありますか?私は当初、スレッドなどで何らかのコールバックを使用すると考えていましたが、もっと受け入れられる解決策があるかどうかはわかりません。

ご協力いただきありがとうございます。

編集:明確にするために、私の評価関数はCPUにバインドされておらず、ドキュメントをスレッド間で分散してできるだけ早く実行しようとはしていません。各スレッドには、ほぼ偶数のジョブが固定されています。

4

2 に答える 2

2

あなたの主な質問について:

これよりも複雑なことを行っている場合、または特にこれを繰り返し行っている場合は、おそらく「スレッド グループ」クラスが必要です。事前に作成されたものは数十ありますが、どれも気に入らない場合は、自分で作成するのは簡単です。

次に、これの代わりに:

threadList = []
for argchunk in splitIntoChunks(values, 10):
  threadList.append(threading.Thread(target=myThreadFunc, args=argchunk))
...
someTotal = 0
for th in threadList:
  th.join()
  someTotal += th.resultsAttribute

あなたはこれを行うことができます:

threadGroup = ThreadGroup.ThreadGroup()
for argchunk in splitIntoChunks(values, 10):
  threadGroup.newThread(myThreadFunc, argchunk)
threadGroup.join()
someTotal = sum(th.resultsAttribute for th in threadGroup)

または、もっと良いのは、完全なスレッド プール ライブラリです。これを行うことができます。

pool = ThreadPool(10)
for argchunk in splitIntoChunks(values, 100):
  pool.putRequest(myThreadFunc, argchunk)
pool.wait()

ここでの利点は、スレッドごとに 10 個のジョブを 1 つずつスケジュールする代わりに、10 個のスレッドで 100 個のジョブを適切にスケジュールするのと同じくらい簡単にできることです。キューを維持するなどのすべての作業を行う必要はありません。欠点は、スレッドを反復処理できないことです。戻り値を取得するには、ジョブを反復する必要があります。理想的には、ジョブを反復できるようにするためだけに、ジョブを最後まで存続させたくありません。

これは、スレッド (またはジョブ) から値を取得する方法という 2 番目の質問につながります。これを行う方法はたくさんあります。

あなたがしたことはうまくいきます。ロックする必要さえありません。

あなたが提案したように、コールバックを使用することも機能します。ただし、コールバックはメイン スレッドではなくワーカー スレッドで実行されることに注意してください。そのため、何らかのグローバル オブジェクトにアクセスする場合は、何らかの同期が必要になります。

とにかく同期する場合は、コールバックにメリットがない可能性があります。たとえば、一連の値を合計するだけの場合は、 を設定するだけtotal=[0]で、各スレッドtotal[0] += myValueがロック内で実行できます。(もちろん、この場合、メイン スレッドで合計を実行してロックを回避する方が理にかなっている可能性がありますが、結果を結合する作業がはるかに重い場合、その選択はそれほど単純ではない可能性があります。)

明示的にロックする代わりに、ある種のアトミック オブジェクトを使用することもできます。たとえば、標準の Queue.Queue と collections.deque はどちらもアトミックであるため、各スレッドは を設定するだけq = Queue.Queue()で、各スレッドは を実行して結果をプッシュし、q.push(myValue)結合後は反復してキューの値を合計します。

group.join()実際、各スレッドがキューに正確に 1 回プッシュする場合、キュー自体で 10 回のブロッキング取得を実行できます。その後、またはpool.wait()または何かがすぐに返されることがわかります。

または、コールバックをジョブとしてキューにプッシュすることもできます。ここでも、キューで 10 回のブロッキング取得を実行し、そのたびに結果を実行できます。

各スレッドが複数のオブジェクトを返すことができる場合、処理が完了したときにセンチネル値またはコールバックをキューにプッシュでき、メイン スレッドは 10 個のセンチネルを読み取るまでポップし続けます。

于 2012-07-20T01:32:08.747 に答える
1

Queue を使用して、情報が利用可能になり次第、スレッドから情報をプッシュします。

これがあなたのスレッドだとしましょう:

class myThread(threading.Thread):
   def __init__(self, results_queue):
       self.results_queue = results_queue
       #other init code here


   def run(self):
       #thread code here

       self.results_queue.put(result) #result is the information you want from the thread

そして、これはあなたのメインコードです:

import Queue #or "import queue" in Python 3.x
results_queue = Queue()

#thread init code here

for i in xrange(num_threads_running):
    data = results_queue.get() # queue.get() blocks until some item is available
    #process data as it is made available

#at this point, there is no need to .join(), since all the threads terminate as soon as they put data to the queue.
于 2012-07-19T23:47:20.907 に答える