86

各行を処理して(いくつかの操作を実行して)データベースに保存する1つの大きなテキストファイルがあります。単一の単純なプログラムには時間がかかりすぎるので、複数のプロセスまたはスレッドを介して実行する必要があります。各スレッド/プロセスは、その単一のファイルから異なるデータ(異なる行)を読み取り、それらのデータ(行)に対していくつかの操作を実行してデータベースに配置する必要があります。これにより、最終的にすべてのデータが処理され、データベースは必要なデータとともにダンプされます。

しかし、私はこれにどのようにアプローチするかを理解することができません。

4

3 に答える 3

114

あなたが探しているのは生産者/消費者パターンです

基本的なねじ切りの例

(マルチプロセッシングの代わりに) threading モジュールを使用した基本的な例を次に示します。

import threading
import Queue
import sys

def do_work(in_queue, out_queue):
    while True:
        item = in_queue.get()
        # process
        result = item
        out_queue.put(result)
        in_queue.task_done()

if __name__ == "__main__":
    work = Queue.Queue()
    results = Queue.Queue()
    total = 20

    # start for workers
    for i in xrange(4):
        t = threading.Thread(target=do_work, args=(work, results))
        t.daemon = True
        t.start()

    # produce data
    for i in xrange(total):
        work.put(i)

    work.join()

    # get the results
    for i in xrange(total):
        print results.get()

    sys.exit()

ファイル オブジェクトをスレッドと共有しません。キューにデータ行を提供することで、それらの作業を生成します。次に、各スレッドが行を取得して処理し、キューに戻します。

リストや特別な種類の Queueなど、データを共有するためにmultiprocessing モジュールに組み込まれたより高度な機能がいくつかあります。マルチプロセッシングとスレッドの使用にはトレードオフがあり、作業が CPU バウンドか IO バウンドかによって異なります。

基本的な multiprocessing.Pool の例

マルチプロセッシング プールの非常に基本的な例を次に示します。

from multiprocessing import Pool

def process_line(line):
    return "FOO: %s" % line

if __name__ == "__main__":
    pool = Pool(4)
    with open('file.txt') as source_file:
        # chunk the work into batches of 4 lines at a time
        results = pool.map(process_line, source_file, 4)

    print results

プールは、独自のプロセスを管理する便利なオブジェクトです。開いているファイルはその行を反復処理できるため、それを に渡すことができます。これにより、ファイルがpool.map()ループされ、行がワーカー関数に配信されます。Mapはブロックし、完了すると結果全体を返します。これは非常に単純化された例であり、pool.map()作業を行う前に一度にファイル全体をメモリに読み込むことに注意してください。大きなファイルが予想される場合は、この点に注意してください。プロデューサー/コンシューマーのセットアップを設計するためのより高度な方法があります。

リミットとラインの再ソートによる手動の「プール」

これは Pool.map の手動の例ですが、イテラブル全体を一度に消費する代わりに、キュー サイズを設定して、処理できる速さで少しずつフィードするようにすることができます。行番号も追加して、後で追跡して必要に応じて参照できるようにしました。

from multiprocessing import Process, Manager
import time
import itertools 

def do_work(in_queue, out_list):
    while True:
        item = in_queue.get()
        line_no, line = item

        # exit signal 
        if line == None:
            return

        # fake work
        time.sleep(.5)
        result = (line_no, line)

        out_list.append(result)


if __name__ == "__main__":
    num_workers = 4

    manager = Manager()
    results = manager.list()
    work = manager.Queue(num_workers)

    # start for workers    
    pool = []
    for i in xrange(num_workers):
        p = Process(target=do_work, args=(work, results))
        p.start()
        pool.append(p)

    # produce data
    with open("source.txt") as f:
        iters = itertools.chain(f, (None,)*num_workers)
        for num_and_line in enumerate(iters):
            work.put(num_and_line)

    for p in pool:
        p.join()

    # get the results
    # example:  [(1, "foo"), (10, "bar"), (0, "start")]
    print sorted(results)
于 2012-06-25T20:11:29.187 に答える
9

これは私が作った本当にばかげた例です:

import os.path
import multiprocessing

def newlinebefore(f,n):
    f.seek(n)
    c=f.read(1)
    while c!='\n' and n > 0:
        n-=1
        f.seek(n)
        c=f.read(1)

    f.seek(n)
    return n

filename='gpdata.dat'  #your filename goes here.
fsize=os.path.getsize(filename) #size of file (in bytes)

#break the file into 20 chunks for processing.
nchunks=20
initial_chunks=range(1,fsize,fsize/nchunks)

#You could also do something like:
#initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too.


with open(filename,'r') as f:
    start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks]))

end_byte=[i-1 for i in start_byte] [1:] + [None]

def process_piece(filename,start,end):
    with open(filename,'r') as f:
        f.seek(start+1)
        if(end is None):
            text=f.read()
        else: 
            nbytes=end-start+1
            text=f.read(nbytes)

    # process text here. createing some object to be returned
    # You could wrap text into a StringIO object if you want to be able to
    # read from it the way you would a file.

    returnobj=text
    return returnobj

def wrapper(args):
    return process_piece(*args)

filename_repeated=[filename]*len(start_byte)
args=zip(filename_repeated,start_byte,end_byte)

pool=multiprocessing.Pool(4)
result=pool.map(wrapper,args)

#Now take your results and write them to the database.
print "".join(result)  #I just print it to make sure I get my file back ...

ここで注意が必要なのは、改行文字でファイルを分割して、行を見逃さないようにする (または部分的な行のみを読み取る) ことです。次に、各プロセスはファイルの一部を読み取り、メインスレッドによってデータベースに配置できるオブジェクトを返します。もちろん、一度にすべての情報をメモリに保持する必要がないように、この部分をチャンクで実行する必要がある場合もあります。(これは非常に簡単に実行できます。「args」リストを X チャンクに分割して呼び出すだけです。こちらpool.map(wrapper,chunk) を参照してください)

于 2012-06-25T20:46:24.610 に答える
-2

単一の大きなファイルを複数の小さなファイルに分割し、それぞれを別々のスレッドで処理します。

于 2012-06-25T20:09:10.430 に答える