83

非常に大きなリストがあり、次のような操作を実行しているとします。

for item in items:
    try:
        api.my_operation(item)
    except:
        print 'error with item'

私の問題は2つあります:

  • たくさんのアイテムがあります
  • api.my_operation が返されるまでに時間がかかります

マルチスレッドを使用して api.my_operations の束を一度にスピンアップして、一度に 5 つまたは 10 個、あるいは 100 個のアイテムを処理できるようにしたいと考えています。

my_operation() が例外を返した場合 (その項目を既に処理した可能性があるため) - それは問題ありません。それは何も壊しません。ループは次の項目に進むことができます。

:これはPython 2.7.3用です

4

4 に答える 4

146

まず、Python では、コードが CPU バウンドの場合、マルチスレッドは役に立ちません。なぜなら、一度に 1 つのスレッドしかグローバル インタープリター ロックを保持できないため、Python コードを実行できないからです。したがって、スレッドではなくプロセスを使用する必要があります。

これは、操作が IO バウンド (つまり、ネットワークやディスク コピーなどで待機) であるため、"戻るのに永遠にかかる" 場合には当てはまりません。それについては後で説明します。


次に、一度に 5 個、10 個、または 100 個のアイテムを処理する方法は、5 個、10 個、または 100 個のワーカーのプールを作成し、ワーカーがサービスを提供するキューにアイテムを入れることです。幸いなことに、stdlibmultiprocessingconcurrent.futuresライブラリの両方で、ほとんどの詳細がまとめられています。

前者は、従来のプログラミングに対してより強力で柔軟です。future-waiting を構成する必要がある場合は、後者の方が簡単です。些細なケースでは、どちらを選択しても問題ありません。(この場合、 each での最も明白な実装は、 で 3 行、futuresで 4 行かかりますmultiprocessing。)

2.6-2.7 または 3.0-3.1 を使用している場合は、組み込まれていませんが、 PyPI ( )futuresからインストールできます。pip install futures


最後に、通常、ループの繰り返し全体を関数呼び出し (たとえば、 に渡すことができるもの) に変えることができれば、物事を並列化するのはずっと簡単ですmap

def try_my_operation(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

すべてを一緒に入れて:

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_my_operation, item) for item in items]
concurrent.futures.wait(futures)

比較的小さなジョブが多数ある場合、マルチプロセッシングのオーバーヘッドが利益を圧倒する可能性があります。それを解決する方法は、作業をより大きなジョブにまとめることです。例(コピーしてコードに貼り付けるか、PyPIのプロジェクトから取得できるレシピgrouperから使用):itertoolsmore-itertools

def try_multiple_operations(items):
    for item in items:
        try:
            api.my_operation(item)
        except:
            print('error with item')

executor = concurrent.futures.ProcessPoolExecutor(10)
futures = [executor.submit(try_multiple_operations, group) 
           for group in grouper(5, items)]
concurrent.futures.wait(futures)

最後に、コードが IO バウンドの場合はどうなるでしょうか? 次に、スレッドはプロセスと同じくらい優れており、オーバーヘッドが少なくなります (そして制限が少なくなりますが、通常、これらの制限はこのような場合には影響しません)。場合によっては、「オーバーヘッドが少ない」ということは、スレッドでバッチ処理を行う必要がないことを意味するのに十分ですが、プロセスでは行うことができます。これは素晴らしい勝利です。

では、プロセスの代わりにどのようにスレッドを使用するのでしょうか? に変更ProcessPoolExecutorするだけThreadPoolExecutorです。

コードが CPU バウンドなのか IO バウンドなのかわからない場合は、両方の方法で試してみてください。


Python スクリプトの複数の関数に対してこれを行うことはできますか? たとえば、並列化したいコードの別の場所に for ループがあるとします。同じスクリプトで 2 つのマルチスレッド機能を実行することは可能ですか?

はい。実際、それを行うには2つの異なる方法があります。

まず、同じ (スレッドまたはプロセス) executor を共有し、複数の場所から問題なく使用できます。タスクとフューチャの要点は、それらが自己完結型であるということです。それらがどこで実行されるかは気にしません。それらをキューに入れ、最終的に回答を取得するだけです。

あるいは、問題なく同じプログラムに 2 つのエグゼキュータを含めることができます。これにはパフォーマンス コストがかかります。両方のエグゼキュータを同時に使用している場合、(たとえば) 8 コアで 16 のビジー スレッドを実行しようとすることになり、コンテキストの切り替えが発生することになります。しかし、たとえば、2 つのエグゼキュータが同時にビジーになることはめったになく、コードがはるかに単純になるため、実行する価値がある場合もあります。または、1 つのエグゼキューターが完了するまでに時間がかかる可能性がある非常に大きなタスクを実行し、もう 1 つのエグゼキューターができるだけ早く完了する必要がある非常に小さなタスクを実行している可能性があります。これは、プログラムの一部のスループットよりも応答性の方が重要であるためです。

どれがあなたのプログラムに適しているかわからない場合、通常はそれが最初です。

于 2013-02-28T19:27:05.133 に答える
45

multiprocesing.poolがあり、次のサンプルはそれらの1つを使用する方法を示しています。

from multiprocessing.pool import ThreadPool as Pool
# from multiprocessing import Pool

pool_size = 5  # your "parallelness"

# define worker function before a Pool is instantiated
def worker(item):
    try:
        api.my_operation(item)
    except:
        print('error with item')

pool = Pool(pool_size)

for item in items:
    pool.apply_async(worker, (item,))

pool.close()
pool.join()

@abarnertが述べたように、プロセスがCPUにバインドされていることを実際に確認した場合は、ThreadPoolをプロセスプールの実装に変更します(ThreadPoolインポートの下にコメントされています)。詳細については、http: //docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workersをご覧ください。

于 2013-02-28T20:11:38.487 に答える
19

次のようなアプローチを使用して、処理を指定された数のスレッドに分割できます。

import threading                                                                

def process(items, start, end):                                                 
    for item in items[start:end]:                                               
        try:                                                                    
            api.my_operation(item)                                              
        except Exception:                                                       
            print('error with item')                                            


def split_processing(items, num_splits=4):                                      
    split_size = len(items) // num_splits                                       
    threads = []                                                                
    for i in range(num_splits):                                                 
        # determine the indices of the list this thread will handle             
        start = i * split_size                                                  
        # special case on the last chunk to account for uneven splits           
        end = None if i+1 == num_splits else (i+1) * split_size                 
        # create the thread                                                     
        threads.append(                                                         
            threading.Thread(target=process, args=(items, start, end)))         
        threads[-1].start() # start the thread we just created                  

    # wait for all threads to finish                                            
    for t in threads:                                                           
        t.join()                                                                



split_processing(items)
于 2013-02-28T19:33:22.653 に答える