73

Python でスレッド化を初めて見たときにほとんどの人が気付いているように、並列処理を実際に行いたい、または少なくともチャンスを与えたいと考えている人々にとって、人生を悲惨なものにする GIL があります。

現在、Reactor パターンのような実装を検討しています。事実上、あるスレッドのような着信ソケット接続をリッスンし、誰かが接続しようとすると、その接続を受け入れて、処理のために別のスレッドのようなものに渡します。

どのような種類の負荷がかかる可能性があるかは (まだ) わかりません。現在、受信メッセージに 2MB の上限が設定されていることは知っています。理論的には、1 秒あたり数千を取得できます (ただし、実際にそのようなものが見られたかどうかはわかりません)。メッセージの処理にかかる時間はさほど重要ではありません、明らかに速いほどよいでしょう。

multiprocessing私は Reactor パターンを調べていて、 (少なくともテストでは) 問題なく動作するように見えるライブラリを使用して小さな例を開発しました。ただし、イベント ループを処理するasyncioライブラリが利用可能になる予定です。

と を組み合わせて私を噛むことができるものはありasyncioますmultiprocessingか?

4

3 に答える 3

89

直接使用するべきではありませんが、問題なくasyncio安全に組み合わせることができるはずです。(およびその他のイベントループ ベースの非同期フレームワーク) の大罪は、イベント ループをブロックしています。直接使用しようとすると、子プロセスを待機するためにブロックするたびに、イベント ループがブロックされます。明らかに、これは悪いことです。multiprocessingmultiprocessingasynciomultiprocessing

これを回避する最も簡単な方法は、 を使用BaseEventLoop.run_in_executorして関数を実行することconcurrent.futures.ProcessPoolExecutorです。ProcessPoolExecutorは、 を使用して実装されたプロセス プールですがmultiprocessing.Processasyncioイベント ループをブロックすることなく関数を実行するためのサポートが組み込まれています。簡単な例を次に示します。

import time
import asyncio
from concurrent.futures import ProcessPoolExecutor

def blocking_func(x):
   time.sleep(x) # Pretend this is expensive calculations
   return x * 5

@asyncio.coroutine
def main():
    #pool = multiprocessing.Pool()
    #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
    executor = ProcessPoolExecutor()
    out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not
    print(out)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

ほとんどの場合、これは機能だけで十分です。、、 、 などの他の構造が必要な場合multiprocessingは、すべてのデータ構造の互換バージョンを提供する (完全開示: 私が書いた)というサードパーティ ライブラリがあります。これをデモする例を次に示します。QueueEventManageraioprocessingasynciomultiprocessing

import time
import asyncio
import aioprocessing
import multiprocessing

def func(queue, event, lock, items):
    with lock:
        event.set()
        for item in items:
            time.sleep(3)
            queue.put(item+5)
    queue.close()

@asyncio.coroutine
def example(queue, event, lock):
    l = [1,2,3,4,5]
    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) 
    p.start()
    while True:
        result = yield from queue.coro_get()
        if result is None:
            break
        print("Got result {}".format(result))
    yield from p.coro_join()

@asyncio.coroutine
def example2(queue, event, lock):
    yield from event.coro_wait()
    with (yield from lock):
        yield from queue.coro_put(78)
        yield from queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = aioprocessing.AioQueue()
    lock = aioprocessing.AioLock()
    event = aioprocessing.AioEvent()
    tasks = [ 
        asyncio.async(example(queue, event, lock)),
        asyncio.async(example2(queue, event, lock)),
    ]   
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
于 2015-03-19T14:50:04.227 に答える
8

はい、あなたを噛むかもしれない(または噛まないかもしれない)ビットがかなりあります。

  • 何かasyncioを実行すると、1 つのスレッドまたはプロセスで実行されることが期待されます。これは (それ自体では) 並列処理では機能しません。IO 操作 (特にソケットの操作) を単一のスレッド/プロセスに残しながら、どうにかして作業を分散する必要があります。
  • 個々の接続を別のハンドラー プロセスに渡すというアイデアは素晴らしいですが、実装するのは困難です。asyncio最初の障害は、接続を閉じずに引き抜く方法が必要なことです。次の障害は、C 拡張からプラットフォーム固有の (おそらく Linux) コードを使用しない限り、ファイル記述子を別のプロセスに単純に送信できないことです。
  • このmultiprocessingモジュールは、通信用に多数のスレッドを作成することが知られていることに注意してください。ほとんどの場合、通信構造 ( Queues など) を使用すると、スレッドが生成されます。残念ながら、これらのスレッドは完全に見えなくなるわけではありません。たとえば、(プログラムを終了するつもりである場合) それらはきれいに分解できない可能性がありますが、それらの数によっては、リソースの使用量がそれ自体で目立つ場合があります。

個々のプロセスで個々の接続を処理する場合は、さまざまなアプローチを検討することをお勧めします。たとえば、ソケットをリッスン モードにしてから、同時に複数のワーカー プロセスからの接続を並行して受け入れることができます。ワーカーがリクエストの処理を完了すると、次の接続を受け入れることができるため、接続ごとにプロセスをフォークするよりもリソースの使用量が少なくなります。たとえば、Spamassassin と Apache (mpm prefork) は、このワーカー モデルを使用できます。ユースケースによっては、より簡単で堅牢になる可能性があります。具体的には、構成された数のリクエストを処理した後にワーカーを終了させ、マスター プロセスによって再生成されるようにすることで、メモリ リークの悪影響の多くを排除できます。

于 2014-01-17T06:03:42.050 に答える
1

PEP 3156、特にスレッドの相互作用に関するセクションを参照してください。

http://www.python.org/dev/peps/pep-3156/#thread-interaction

これは、run_in_executor() を含む、使用する可能性のある新しい asyncio メソッドを明確に文書化しています。Executor は concurrent.futures で定義されていることに注意してください。そちらも参照することをお勧めします。

于 2014-02-14T21:38:01.197 に答える