かなり単純な問題だと思いますが、期待どおりに機能させることができないようです。~200 ~200 MB のファイルがあり、一度にすべてをメモリにロードすることはできません。各ファイルは、他のファイルごとに 1 回処理する必要があります (つまり、処理関数の約 20000 操作)。次のようなファイルを複数回ループする必要があります。
for i in xrange(len(file_list)-1): #go through each file
if i==0:
f = read(file_list[i]) #load in the first file
else:
f = g0 #we've already loaded in the next file once, don't re-load it
for j in xrange(i+1,len(file_list)): #go through each other file to get all unique pairs
g = read(file_list[i+j+1]) # takes 5 s
answer.append(processing_function(f,g)) # takes 10s
if j==0:
g = g0 # this will be "f" for the next iteration of the outer loop
外側のループはファイル 1 のみをロードする必要があり、その後、内側のループの最初の繰り返しから既にロードされているファイルを次の値として取得できます。内側のループは (len(file_list)-i) ファイルを読み込んで処理する必要があります。~200 個のファイルをメモリに保持することはできず、ファイル リストを分割する簡単な方法がないため (各ファイルを正確に 1 つずつペアにする必要があるため)
processing_function(f_i,g_j) の実行中に少なくとも次のファイル、つまり g_j+1 を読み込むように read() 関数を並列化する明白な方法はありますか? 私はスレッド化とマルチプロセッシングの両方を試しましたが、読み取りを別のスレッド/プロセスにオフロードすることはできましたが、並行して実行されることはありません (つまり、ag を取得するとすぐに processing_function() の実行が開始されます)。 、将来の g_j がバックグラウンドでロードされている間)。プログラムをハングアップさせる (マルチプロセッシングを使用) か、最初のファイルで processing_function() を実行する前に一連のファイルをキューにロードする (スレッドを使用) ことができました。
これが簡単に実行できることを願っています。細部を台無しにしているだけです。
これは私がこれまでに試したことです(誰かが他の場所で提案したコードから)-これは私が望むことを行うべきだと思われますが、私が言ったように、 QueuedFileReader() の file_worker メソッドが通過して QUEUE_SIZE に達するようですQueuedFileReader がジェネレーターであるループに制御を戻す前に、ファイルをキューに入れます。file_worker が単独で実行され、単一のファイルが next() 呼び出しのキューで準備ができたら、QueuedFileReader() を使用するループが続行できれば完璧です...
class QueuedFileReader():
def __init__(self,file_list,tmp_dir,QUEUE_SIZE):
self.queue =Queue.Queue(QUEUE_SIZE)
self.worker = threading.Thread(target=QueuedFileReader.file_worker,args=(self.queue,file_list,tmp_dir))
#self.queue = multiprocessing.Queue(QUEUE_SIZE)
#self.worker = multiprocessing.Process(target=QueuedFileReader.file_worker,args=(self.queue,file_list,tmp_dir))
self.worker.daemon=True
self.worker.start()
@staticmethod
def file_worker(queue,file_list,tmp_dir):
for i,f in enumerate(file_list):
done = False
while True and not done:
try:
print "attempting to read %s"%f_in
queue.put((i,f_in,files.read(f)))
print "successfully read in %s"%f
done = True
except Queue.Full:
pass
queue.put('done')
def __iter__(self):
return self
def next(self):
N = 0
while True:
try:
x = self.queue.get()
break
except Queue.Empty:
pass
if x == 'done': raise StopIteration
return x
def __del__(self):
self.worker.join()
それは次から呼び出されます:
for j,fnm,g in QueuedFileReader(flz[i+1:],tmp_dir,QUEUE_SIZE):
#the code that runs on object "f" from the outer loop and "g" that should be supplied off the top of the queue on each next() call to QueuedFileReader()