Python でのスレッド化を理解しようとしています。ドキュメントと例を見てきましたが、率直に言って、多くの例は過度に洗練されており、理解に苦しんでいます。
マルチスレッドのためにタスクが分割されていることをどのように明確に示しますか?
Python でのスレッド化を理解しようとしています。ドキュメントと例を見てきましたが、率直に言って、多くの例は過度に洗練されており、理解に苦しんでいます。
マルチスレッドのためにタスクが分割されていることをどのように明確に示しますか?
この質問が 2010 年に出されて以来、mapとpoolを使用して Python で単純なマルチスレッドを実行する方法が大幅に簡素化されました。
以下のコードは、必ずチェックする必要がある記事/ブログ投稿からのものです (所属なし) - Parallelism in one line: A Better Model for Day to Day Threading Tasks。以下に要約します。最終的には数行のコードになります。
from multiprocessing.dummy import Pool as ThreadPool
pool = ThreadPool(4)
results = pool.map(my_function, my_array)
次のマルチスレッド バージョンはどれですか。
results = []
for item in my_array:
results.append(my_function(item))
説明
Map はクールな小さな関数であり、Python コードに並列処理を簡単に挿入するための鍵です。なじみのない人のために説明すると、map は Lisp のような関数型言語から持ち出されたものです。これは、別の関数をシーケンスにマップする関数です。
Map は、シーケンスの反復を処理し、関数を適用し、すべての結果を最後に便利なリストに格納します。
実装
map 関数の並列バージョンは、2 つのライブラリによって提供されます: multiprocessing と、あまり知られていませんが、同様に素晴らしいステップの子である multiprocessing.dummy です。
multiprocessing.dummy
multiprocessing モジュールとまったく同じですが、代わりにスレッドを使用します(重要な違い- CPU を集中的に使用するタスクには複数のプロセスを使用し、I/O (および実行中) にはスレッドを使用します):
multiprocessing.dummy は multiprocessing の API を複製しますが、threading モジュールのラッパーにすぎません。
import urllib2
from multiprocessing.dummy import Pool as ThreadPool
urls = [
'http://www.python.org',
'http://www.python.org/about/',
'http://www.onlamp.com/pub/a/python/2003/04/17/metaclasses.html',
'http://www.python.org/doc/',
'http://www.python.org/download/',
'http://www.python.org/getit/',
'http://www.python.org/community/',
'https://wiki.python.org/moin/',
]
# Make the Pool of workers
pool = ThreadPool(4)
# Open the URLs in their own threads
# and return the results
results = pool.map(urllib2.urlopen, urls)
# Close the pool and wait for the work to finish
pool.close()
pool.join()
タイミングの結果は次のとおりです。
Single thread: 14.4 seconds
4 Pool: 3.1 seconds
8 Pool: 1.4 seconds
13 Pool: 1.3 seconds
複数の引数を渡す( Python 3.3 以降でのみこのように機能します):
複数の配列を渡すには:
results = pool.starmap(function, zip(list_a, list_b))
または、定数と配列を渡すには:
results = pool.starmap(function, zip(itertools.repeat(constant), list_a))
以前のバージョンの Python を使用している場合は、この回避策で複数の引数を渡すことができます)。
(役立つコメントをくれたuser136036に感謝します。)
簡単な例を次に示します。いくつかの代替 URL を試して、最初に応答した URL のコンテンツを返す必要があります。
import Queue
import threading
import urllib2
# Called by each thread
def get_url(q, url):
q.put(urllib2.urlopen(url).read())
theurls = ["http://google.com", "http://yahoo.com"]
q = Queue.Queue()
for u in theurls:
t = threading.Thread(target=get_url, args = (q,u))
t.daemon = True
t.start()
s = q.get()
print s
これは、スレッド化が単純な最適化として使用されるケースです。各サブスレッドは、URL が解決されて応答するのを待って、その内容をキューに入れます。各スレッドはデーモンです (メイン スレッドが終了すると、プロセスを維持しません。これは、そうでない場合よりも一般的です)。メインスレッドはすべてのサブスレッドを開始get
し、キューで a を実行して、そのうちの 1 つが a を実行するまで待機しput
、結果を発行して終了します (デーモンスレッドであるため、実行中の可能性のあるサブスレッドを停止します)。
Python でのスレッドの適切な使用は、常に I/O 操作に関連しています (CPython は複数のコアを使用して CPU バウンドのタスクを実行しないため、スレッド化の唯一の理由は、I/O の待機中にプロセスをブロックしないことです)。 )。ちなみに、キューはほぼ常に、作業をスレッドにファームアウトしたり、作業の結果を収集したりするための最良の方法であり、本質的にスレッドセーフであるため、ロック、条件、イベント、セマフォ、およびその他のインターを心配する必要がありません。 -スレッド調整/コミュニケーションの概念。
注: Python での実際の並列化では、マルチプロセッシングモジュールを使用して、並列で実行される複数のプロセスをフォークする必要があります (グローバル インタープリター ロックにより、Python スレッドはインターリーブを提供しますが、実際には、並列ではなくシリアルで実行されます。 I/O 操作をインターリーブする場合に役立ちます)。
ただし、インターリーブを探しているだけの場合 (または、グローバル インタープリター ロックにもかかわらず並列化できる I/O 操作を実行している場合) は、threadingモジュールから始めてください。非常に単純な例として、サブ範囲を並列に合計して大きな範囲を合計する問題を考えてみましょう。
import threading
class SummingThread(threading.Thread):
def __init__(self,low,high):
super(SummingThread, self).__init__()
self.low=low
self.high=high
self.total=0
def run(self):
for i in range(self.low,self.high):
self.total+=i
thread1 = SummingThread(0,500000)
thread2 = SummingThread(500000,1000000)
thread1.start() # This actually causes the thread to run
thread2.start()
thread1.join() # This waits until the thread has completed
thread2.join()
# At this point, both threads have completed
result = thread1.total + thread2.total
print result
上記は非常にばかげた例であることに注意してください。これは、I/O をまったく行わず、グローバル インタープリター ロックのためにCPythonでインターリーブされますが (コンテキスト切り替えのオーバーヘッドが追加されます)、シリアルに実行されるためです。
他の人が述べたように、CPython はGILによる I/O 待機にのみスレッドを使用できます。
CPU バウンドのタスクで複数のコアを活用したい場合は、multiprocessingを使用します。
from multiprocessing import Process
def f(name):
print 'hello', name
if __name__ == '__main__':
p = Process(target=f, args=('bob',))
p.start()
p.join()
注意: スレッド化にはキューは必要ありません。
これは、10 個のプロセスが同時に実行されていることを示す、私が想像できる最も単純な例です。
import threading
from random import randint
from time import sleep
def print_number(number):
# Sleeps a random 1 to 10 seconds
rand_int_var = randint(1, 10)
sleep(rand_int_var)
print "Thread " + str(number) + " slept for " + str(rand_int_var) + " seconds"
thread_list = []
for i in range(1, 10):
# Instantiates the thread
# (i) does not make a sequence, so (i,)
t = threading.Thread(target=print_number, args=(i,))
# Sticks the thread in a list so that it remains accessible
thread_list.append(t)
# Starts threads
for thread in thread_list:
thread.start()
# This blocks the calling thread until the thread whose join() method is called is terminated.
# From http://docs.python.org/2/library/threading.html#thread-objects
for thread in thread_list:
thread.join()
# Demonstrates that the main process waited for threads to complete
print "Done"
これは非常に便利だと思いました: コアと同じ数のスレッドを作成し、それらに (多数の) タスクを実行させます (この場合、シェル プログラムを呼び出します):
import Queue
import threading
import multiprocessing
import subprocess
q = Queue.Queue()
for i in range(30): # Put 30 tasks in the queue
q.put(i)
def worker():
while True:
item = q.get()
# Execute a task: call a shell program and wait until it completes
subprocess.call("echo " + str(item), shell=True)
q.task_done()
cpus = multiprocessing.cpu_count() # Detect number of cores
print("Creating %d threads" % cpus)
for i in range(cpus):
t = threading.Thread(target=worker)
t.daemon = True
t.start()
q.join() # Block until all tasks are done
Python 3 には、並列タスクを起動する機能があります。これにより、作業が容易になります。
スレッド プーリングとプロセス プーリングがあります。
以下から洞察が得られます。
ThreadPoolExecutor の例( source )
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor (ソース)
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
これは、役立つ簡単な例を含むマルチスレッドです。これを実行すると、Python でマルチスレッドがどのように機能するかを簡単に理解できます。前のスレッドが作業を完了するまで、他のスレッドへのアクセスを防ぐためにロックを使用しました。このコード行を使用すると、
tLock = threading.BoundedSemaphore(値=4)
一度に多数のプロセスを許可し、後で実行されるか、前のプロセスが終了した後に実行される残りのスレッドを保持し続けることができます。
import threading
import time
#tLock = threading.Lock()
tLock = threading.BoundedSemaphore(value=4)
def timer(name, delay, repeat):
print "\r\nTimer: ", name, " Started"
tLock.acquire()
print "\r\n", name, " has the acquired the lock"
while repeat > 0:
time.sleep(delay)
print "\r\n", name, ": ", str(time.ctime(time.time()))
repeat -= 1
print "\r\n", name, " is releaseing the lock"
tLock.release()
print "\r\nTimer: ", name, " Completed"
def Main():
t1 = threading.Thread(target=timer, args=("Timer1", 2, 5))
t2 = threading.Thread(target=timer, args=("Timer2", 3, 5))
t3 = threading.Thread(target=timer, args=("Timer3", 4, 5))
t4 = threading.Thread(target=timer, args=("Timer4", 5, 5))
t5 = threading.Thread(target=timer, args=("Timer5", 0.1, 5))
t1.start()
t2.start()
t3.start()
t4.start()
t5.start()
print "\r\nMain Complete"
if __name__ == "__main__":
Main()
とても分かりやすいです。スレッド化を行う 2 つの簡単な方法を次に示します。
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
import threading
def a(a=1, b=2):
print(a)
time.sleep(5)
print(b)
return a+b
def b(**kwargs):
if "a" in kwargs:
print("am b")
else:
print("nothing")
to_do=[]
executor = ThreadPoolExecutor(max_workers=4)
ex1=executor.submit(a)
to_do.append(ex1)
ex2=executor.submit(b, **{"a":1})
to_do.append(ex2)
for future in as_completed(to_do):
print("Future {} and Future Return is {}\n".format(future, future.result()))
print("threading")
to_do=[]
to_do.append(threading.Thread(target=a))
to_do.append(threading.Thread(target=b, kwargs={"a":1}))
for threads in to_do:
threads.start()
for threads in to_do:
threads.join()