0

かなり単純な問題だと思いますが、期待どおりに機能させることができないようです。~200 ~200 MB のファイルがあり、一度にすべてをメモリにロードすることはできません。各ファイルは、他のファイルごとに 1 回処理する必要があります (つまり、処理関数の約 20000 操作)。次のようなファイルを複数回ループする必要があります。

for i in xrange(len(file_list)-1): #go through each file
    if i==0: 
        f = read(file_list[i])   #load in the first file
    else:
        f = g0  #we've already loaded in the next file once, don't re-load it
    for j in xrange(i+1,len(file_list)): #go through each other file to get all unique pairs
        g = read(file_list[i+j+1])  # takes 5 s
        answer.append(processing_function(f,g)) # takes 10s
        if j==0:
           g = g0 # this will be "f" for the next iteration of the outer loop

外側のループはファイル 1 のみをロードする必要があり、その後、内側のループの最初の繰り返しから既にロードされているファイルを次の値として取得できます。内側のループは (len(file_list)-i) ファイルを読み込んで処理する必要があります。~200 個のファイルをメモリに保持することはできず、ファイル リストを分割する簡単な方法がないため (各ファイルを正確に 1 つずつペアにする必要があるため)

processing_function(f_i,g_j) の実行中に少なくとも次のファイル、つまり g_j+1 を読み込むように read() 関数を並列化する明白な方法はありますか? 私はスレッド化とマルチプロセッシングの両方を試しましたが、読み取りを別のスレッド/プロセスにオフロードすることはできましたが、並行して実行されることはありません (つまり、ag を取得するとすぐに processing_function() の実行が開始されます)。 、将来の g_j がバックグラウンドでロードされている間)。プログラムをハングアップさせる (マルチプロセッシングを使用) か、最初のファイルで processing_function() を実行する前に一連のファイルをキューにロードする (スレッドを使用) ことができました。

これが簡単に実行できることを願っています。細部を台無しにしているだけです。

これは私がこれまでに試したことです(誰かが他の場所で提案したコードから)-これは私が望むことを行うべきだと思われますが、私が言ったように、 QueuedFileReader() の file_worker メソッドが通過して QUEUE_SIZE に達するようですQueuedFileReader がジェネレーターであるループに制御を戻す前に、ファイルをキューに入れます。file_worker が単独で実行され、単一のファイルが next() 呼び出しのキューで準備ができたら、QueuedFileReader() を使用するループが続行できれば完璧です...

class QueuedFileReader():
    def __init__(self,file_list,tmp_dir,QUEUE_SIZE):
        self.queue =Queue.Queue(QUEUE_SIZE)                                                                           
        self.worker = threading.Thread(target=QueuedFileReader.file_worker,args=(self.queue,file_list,tmp_dir))                             
        #self.queue = multiprocessing.Queue(QUEUE_SIZE)
        #self.worker = multiprocessing.Process(target=QueuedFileReader.file_worker,args=(self.queue,file_list,tmp_dir))
        self.worker.daemon=True
        self.worker.start()

    @staticmethod
    def file_worker(queue,file_list,tmp_dir):
        for i,f in enumerate(file_list):
            done = False
            while True and not done:
                try:
                    print "attempting to read %s"%f_in
                    queue.put((i,f_in,files.read(f)))
                    print "successfully read in %s"%f
                    done = True
                except Queue.Full:
                    pass
        queue.put('done')

    def __iter__(self):
        return self

    def next(self):
        N = 0
        while True:
            try:  
                x = self.queue.get()
                break
            except Queue.Empty:
                pass
        if x == 'done': raise StopIteration
        return x

    def __del__(self):
        self.worker.join()

それは次から呼び出されます:

for j,fnm,g in QueuedFileReader(flz[i+1:],tmp_dir,QUEUE_SIZE):
    #the code that runs on object "f" from the outer loop and "g" that should be supplied off the top of the queue on each next() call to QueuedFileReader()
4

0 に答える 0