397

マルチプロセッシング モジュールのPool クラスに似た、ワーカースレッド用の Pool クラスはありますか?

たとえば、マップ関数を並列化する簡単な方法が好きです

def long_running_func(p):
    c_func_no_gil(p)

p = multiprocessing.Pool(4)
xs = p.map(long_running_func, range(100))

ただし、新しいプロセスを作成するオーバーヘッドなしで実行したいと考えています。

私はGILについて知っています。ただし、私のユースケースでは、関数は、実際の関数呼び出しの前に Python ラッパーが GIL を解放する IO バインド C 関数になります。

独自のスレッド プールを作成する必要がありますか?

4

11 に答える 11

511

モジュールには実際にはスレッドベースのプールインターフェイスがあることがわかりましたが multiprocessing、それは多少隠されており、適切に文書化されていません。

経由でインポートできます

from multiprocessing.pool import ThreadPool

これは、PythonスレッドをラップするダミーのProcessクラスを使用して実装されます。このスレッドベースのProcessクラスは、ドキュメントmultiprocessing.dummyで簡単に説明されています。このダミーモジュールは、スレッドに基づくマルチプロセッシングインターフェイス全体を提供すると思われます。

于 2010-08-02T09:52:28.747 に答える
261

Python 3 では、次のように使用できますconcurrent.futures.ThreadPoolExecutor

executor = ThreadPoolExecutor(max_workers=10)
a = executor.submit(my_function)

詳細と例については、ドキュメントを参照してください。

于 2012-07-17T19:42:40.387 に答える
71

はい、(多かれ少なかれ) 同じ API を持っているようです。

import multiprocessing

def worker(lnk):
    ....    
def start_process():
    .....
....

if(PROCESS):
    pool = multiprocessing.Pool(processes=POOL_SIZE, initializer=start_process)
else:
    pool = multiprocessing.pool.ThreadPool(processes=POOL_SIZE, 
                                           initializer=start_process)

pool.map(worker, inputs)
....
于 2012-11-30T19:42:33.810 に答える
47

非常にシンプルで軽量なものの場合 (ここから少し変更):

from Queue import Queue
from threading import Thread


class Worker(Thread):
    """Thread executing tasks from a given tasks queue"""
    def __init__(self, tasks):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon = True
        self.start()

    def run(self):
        while True:
            func, args, kargs = self.tasks.get()
            try:
                func(*args, **kargs)
            except Exception, e:
                print e
            finally:
                self.tasks.task_done()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads):
        self.tasks = Queue(num_threads)
        for _ in range(num_threads):
            Worker(self.tasks)

    def add_task(self, func, *args, **kargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kargs))

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

if __name__ == '__main__':
    from random import randrange
    from time import sleep

    delays = [randrange(1, 10) for i in range(100)]

    def wait_delay(d):
        print 'sleeping for (%d)sec' % d
        sleep(d)

    pool = ThreadPool(20)

    for i, d in enumerate(delays):
        pool.add_task(wait_delay, d)

    pool.wait_completion()

タスク完了時のコールバックをサポートするには、コールバックをタスク タプルに追加するだけです。

于 2011-08-31T13:23:19.453 に答える
12

最終的に使ってみた結果がこちら。上記の dgorissen によるクラスの修正版です。

ファイル:threadpool.py

from queue import Queue, Empty
import threading
from threading import Thread


class Worker(Thread):
    _TIMEOUT = 2
    """ Thread executing tasks from a given tasks queue. Thread is signalable, 
        to exit
    """
    def __init__(self, tasks, th_num):
        Thread.__init__(self)
        self.tasks = tasks
        self.daemon, self.th_num = True, th_num
        self.done = threading.Event()
        self.start()

    def run(self):       
        while not self.done.is_set():
            try:
                func, args, kwargs = self.tasks.get(block=True,
                                                   timeout=self._TIMEOUT)
                try:
                    func(*args, **kwargs)
                except Exception as e:
                    print(e)
                finally:
                    self.tasks.task_done()
            except Empty as e:
                pass
        return

    def signal_exit(self):
        """ Signal to thread to exit """
        self.done.set()


class ThreadPool:
    """Pool of threads consuming tasks from a queue"""
    def __init__(self, num_threads, tasks=[]):
        self.tasks = Queue(num_threads)
        self.workers = []
        self.done = False
        self._init_workers(num_threads)
        for task in tasks:
            self.tasks.put(task)

    def _init_workers(self, num_threads):
        for i in range(num_threads):
            self.workers.append(Worker(self.tasks, i))

    def add_task(self, func, *args, **kwargs):
        """Add a task to the queue"""
        self.tasks.put((func, args, kwargs))

    def _close_all_threads(self):
        """ Signal all threads to exit and lose the references to them """
        for workr in self.workers:
            workr.signal_exit()
        self.workers = []

    def wait_completion(self):
        """Wait for completion of all the tasks in the queue"""
        self.tasks.join()

    def __del__(self):
        self._close_all_threads()


def create_task(func, *args, **kwargs):
    return (func, args, kwargs)

プールを利用するには

from random import randrange
from time import sleep

delays = [randrange(1, 10) for i in range(30)]

def wait_delay(d):
    print('sleeping for (%d)sec' % d)
    sleep(d)

pool = ThreadPool(20)
for i, d in enumerate(delays):
    pool.add_task(wait_delay, d)
pool.wait_completion()
于 2018-05-10T05:04:03.497 に答える
5

別の方法として、プロセスをスレッド キュー プールに追加することもできます

import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=cpus) as executor:
    for i in range(10):
        a = executor.submit(arg1, arg2,....)
于 2020-06-15T20:28:24.843 に答える
3

組み込みのスレッド ベースのプールはありません。Queueただし、クラスを使用してプロデューサー/コンシューマー キューを実装するのは非常に簡単です。

から: https://docs.python.org/2/library/queue.html

from threading import Thread
from Queue import Queue
def worker():
    while True:
        item = q.get()
        do_work(item)
        q.task_done()

q = Queue()
for i in range(num_worker_threads):
     t = Thread(target=worker)
     t.daemon = True
     t.start()

for item in source():
    q.put(item)

q.join()       # block until all tasks are done
于 2010-06-13T21:30:35.630 に答える
3

新しいプロセスを作成するオーバーヘッドは、特にプロセスが 4 つしかない場合は最小限です。これがアプリケーションのパフォーマンスのホット スポットであるとは思えません。シンプルに保ち、必要な場所とプロファイリング結果が示す場所を最適化します。

于 2010-06-13T22:24:45.890 に答える