マルチスレッド/マルチプロセッサの処理を始めたばかりで、いくつかの問題が発生しています。私がやりたいことは、リモート データベースからダウンロードする必要があるデータに対する多数の要求を生成することです。これらは Queue.Queue に格納されます (in_q と呼びましょう)。すべてのリクエストを生成したら、in_q と別のキュー (out_q) を入力として受け取る、限られた数のスレッド クラスを起動します。次に、q_in からジョブを get() し、結果を q_out に出力します。したがって、この部分は IO バウンドであるため、スレッドが適切な選択であると考えました。q_out からの結果は、プロセスで何らかの作業を行うプロセスのプールによって消費されます。この部分は CPU バウンドであるため、プロセスが適切な選択であると考えました。
これで問題なく動作するように見えますが、以下で説明する奇妙な動作に遭遇したことを除きます。
import threading
import Queue
import multiprocessing as mp
class TestThread(threading.Thread):
def __init__ ( self, threadnr,resultPool,jobPool ):
self.threadnr = threadnr
self.resultPool = resultPool
self.jobPool = jobPool
threading.Thread.__init__ ( self )
def run(self):
while True:
job = self.jobPool.get()
if job != None:
for a in range(10):
for i in xrange(1000000):
pass
print "Thread nr %d finished job %d" % (self.threadnr,job)
self.resultPool.put([self.threadnr,job+1])
self.jobPool.task_done()
def test(i):
print mp.current_process().name,"test",i
return mp.current_process().name,"test",i
if __name__ == '__main__':
q_in = Queue.Queue()
q_out = Queue.Queue()
nr_jobs = 20
res = []
nr_threads = 4
threads = []
for i in range(nr_jobs):
q_in.put(i)
for i in range(nr_threads):
t = TestThread(i,q_out,q_in)
t.start()
threads.append(t)
p_pool = mp.Pool(4)
for i in range(nr_jobs):
job = q_out.get(block=True)
print "Got job",job
res.append(p_pool.apply_async(test,(job,)))
p_pool.close()
p_pool.join()
for r in res:
print r.get()
for t in threads:
t.join()
これの出力は次のとおりです。
Thread nr 2 finished job 2
Got job [2, 3]
Thread nr 0 finished job 0
Got job [0, 1]
Thread nr 1 finished job 1
Got job [1, 2]
Thread nr 3 finished job 3
Got job [3, 4]
Thread nr 2 finished job 4
Got job Thread nr 0 finished job 5[
2, 5]
Got job [0, 6]
Thread nr 1 finished job 6
Got job [1, 7]
Thread nr 3 finished job 7
Got job [3, 8]
Thread nr 2 finished job 8
Got job [2, 9]
Thread nr 0 finished job 9
Got job [0, 10]
PoolWorker-4 test [1, 2]
PoolWorker-4 test [1, 7]
PoolWorker-3 test [3, 4]
PoolWorker-3 test [3, 8]
PoolWorker-2 test [0, 1]
PoolWorker-2 test [0, 6]
PoolWorker-2 test [0, 10]
PoolWorker-1 test [2, 3]
PoolWorker-1 test [2, 5]
PoolWorker-1 test [2, 9]
('PoolWorker-1', 'test', [2, 3])
('PoolWorker-2', 'test', [0, 1])
('PoolWorker-4', 'test', [1, 2])
('PoolWorker-3', 'test', [3, 4])
('PoolWorker-1', 'test', [2, 5])
('PoolWorker-2', 'test', [0, 6])
('PoolWorker-4', 'test', [1, 7])
('PoolWorker-3', 'test', [3, 8])
('PoolWorker-1', 'test', [2, 9])
('PoolWorker-2', 'test', [0, 10])
これは、私の実際のプログラムとほぼ同じように機能するテストプログラムです。私が奇妙だと思うのは、スレッドがプロセスを完了するのに比較的長い時間がかかるにもかかわらず、スレッドがすべての仕事を完了するまでプロセスが出力されないことです。ジョブは継続的に消費されているように見えますが、プロセスからの出力は、すべてのスレッドが完了するまで表示されません。
この例では、かなり無害ですが (煩わしい場合)、私の実際のプログラムでは、プロセスからのすべての出力が最後のスレッドが完了するまで遅延されるため、出力のキューイングによってメモリ エラーが発生するようです。
また、追加の質問として、スレッドとプロセスを混在させることは良い考えですか、それともどちらか一方に固執する必要がありますか?
この件についてご意見をいただければ幸いです。