6

数日前、複数のHTTPリクエストを構造化するためのパラダイムの設計を支援することについてSOについて質問しました。

これがシナリオです。マルチプロデューサー、マルチコンシューマーのシステムが欲しいのですが。私のプロデューサーはいくつかのサイトをクロールしてスクレイプし、見つけたリンクをキューに追加します。複数のサイトをクロールするので、複数のプロデューサー/クローラーが必要です。

コンシューマー/ワーカーはこのキューをフィードし、これらのリンクに対してTCP / UDP要求を行い、結果をDjangoDBに保存します。また、各キューアイテムは完全に独立しているため、複数のワーカーが必要です。

人々は、これにコルーチンライブラリ、つまりGeventまたはEventletを使用することを提案しました。コルーチンを使用したことがないので、プログラミングパラダイムはスレッドパラダイムに似ていますが、アクティブに実行されているスレッドは1つだけですが、I / O呼び出しなどのブロック呼び出しが発生すると、スタックはメモリ内で切り替えられ、もう1つはグリーンになります。スレッドは、ある種のブロッキングI/O呼び出しに遭遇するまで引き継ぎます。うまくいけば、私はこれを正しくしましたか?これが私のSO投稿の1つからのコードです:

import gevent
from gevent.queue import *
import time
import random

q = JoinableQueue()
workers = []
producers = []


def do_work(wid, value):
    gevent.sleep(random.randint(0,2))
    print 'Task', value, 'done', wid


def worker(wid):
    while True:
        item = q.get()
        try:
            print "Got item %s" % item
            do_work(wid, item)
        finally:
            print "No more items"
            q.task_done()


def producer():
    while True:
        item = random.randint(1, 11)
        if item == 10:
            print "Signal Received"
            return
        else:
            print "Added item %s" % item
            q.put(item)


for i in range(4):
    workers.append(gevent.spawn(worker, random.randint(1, 100000)))

# This doesn't work.
for j in range(2):
    producers.append(gevent.spawn(producer))

# Uncommenting this makes this script work.
# producer()

q.join()

sleep呼び出しが呼び出しをブロックしており、sleepイベントが発生すると、別のグリーンスレッドが引き継ぐため、これはうまく機能します。これは、順次実行よりもはるかに高速です。ご覧のとおり、プログラムには、あるスレッドを別のスレッドに意図的に実行させるコードはありません。すべてのスレッドを同時に実行したいので、これが上記のシナリオにどのように適合するかがわかりません。

すべて正常に動作しますが、Gevent / Eventletsを使用して達成したスループットは、元の順次実行プログラムよりも高くなりますが、実際のスレッドを使用して達成できるスループットよりも大幅に低くなります。

スレッドメカニズムを使用してプログラムを再実装する場合、コルーチンのようにスタックをスワップインおよびスワップアウトする必要なしに、プロデューサーとコンシューマーのそれぞれが同時に作業できます。

これは、スレッドを使用して再実装する必要がありますか?私のデザインは間違っていますか?コルーチンを使用することの本当の利点を理解できませんでした。

たぶん私の概念は少し泥だらけですが、これは私が同化したものです。私のパラダイムと概念の助けや明確化は素晴らしいでしょう。

ありがとう

4

3 に答える 3

5

ご覧のとおり、プログラムには、あるスレッドを別のスレッドに意図的に実行させるコードはありません。すべてのスレッドを同時に実行したいので、これが上記のシナリオにどのように適合するかがわかりません。

OSスレッドは1つですが、いくつかのグリーンレットがあります。あなたの場合gevent.sleep()、ワーカーが同時に実行できるようにします。パッチを使用して作業する場合(を呼び出すことにより) urllib2.urlopen(url).read()、同じようにIO呼び出しをブロックします。urllib2geventgevent.monkey.patch_*()

シングルスレッド環境でコードが同時に機能する方法を理解するには、コルーチンと並行性に関する興味深いコースも参照してください。

gevent、スレッド化、マルチプロセッシング間のスループットの違いを比較するために、すべてのアプローチと互換性のあるコードを書くことができます。

#!/usr/bin/env python
concurrency_impl = 'gevent' # single process, single thread
##concurrency_impl = 'threading' # single process, multiple threads
##concurrency_impl = 'multiprocessing' # multiple processes

if concurrency_impl == 'gevent':
    import gevent.monkey; gevent.monkey.patch_all()

import logging
import time
import random
from itertools import count, islice

info = logging.info

if concurrency_impl in ['gevent', 'threading']:
    from Queue import Queue as JoinableQueue
    from threading import Thread
if concurrency_impl == 'multiprocessing':
    from multiprocessing import Process as Thread, JoinableQueue

スクリプトの残りの部分は、すべての同時実行実装で同じです。

def do_work(wid, value):
    time.sleep(random.randint(0,2))
    info("%d Task %s done" % (wid, value))

def worker(wid, q):
    while True:
        item = q.get()
        try:
            info("%d Got item %s" % (wid, item))
            do_work(wid, item)
        finally:
            q.task_done()
            info("%d Done item %s" % (wid, item))

def producer(pid, q):
    for item in iter(lambda: random.randint(1, 11), 10):
        time.sleep(.1) # simulate a green blocking call that yields control
        info("%d Added item %s" % (pid, item))
        q.put(item)
    info("%d Signal Received" % (pid,))

モジュールレベルでコードを実行しないでくださいmain()

def main():
    logging.basicConfig(level=logging.INFO,
                        format="%(asctime)s %(process)d %(message)s")

    q = JoinableQueue()
    it = count(1)
    producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)]
    workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)]
    for t in producers+workers:
        t.daemon = True
        t.start()

    for t in producers: t.join() # put items in the queue
    q.join() # wait while it is empty
    # exit main thread (daemon workers die at this point)

if __name__=="__main__":    
   main()
于 2012-02-12T19:37:12.203 に答える
1

geventは、非常に多くの(緑色の)スレッドがある場合に最適です。私はそれを数千でテストしました、そしてそれは非常にうまくいきました。スクレイピングとデータベースへの保存の両方に使用するすべてのライブラリが緑色になるようにします。afaikがPythonのソケットを使用している場合、geventインジェクションは機能するはずです。ただし、Cで記述された拡張機能(mysqldbなど)はブロックされるため、代わりに同等の緑色を使用する必要があります。

geventを使用する場合、ほとんどの場合、キューを廃止し、すべてのタスクに新しい(緑色の)スレッドを生成できます。スレッドのコードは、と同じくらい単純db.save(web.get(address))です。geventは、dbまたはwebブロック内の一部のライブラリがプリエンプションを処理します。タスクがメモリに収まる限り機能します。

于 2012-02-12T16:17:09.130 に答える
0

この場合、問題はプログラムの速度(つまり、geventまたはスレッドの選択)ではなく、ネットワークIOスループットにあります。これが、プログラムの実行速度を決定するボトルネックです(そうあるべきです)。

Geventは、それがボトルネックであり、プログラムのアーキテクチャではないことを確認するための1つの優れた方法です。

これはあなたが望む種類のプロセスです:

import gevent
from gevent.queue import Queue, JoinableQueue
from gevent.monkey import patch_all


patch_all()  # Patch urllib2, etc


def worker(work_queue, output_queue):
    for work_unit in work_queue:
        finished = do_work(work_unit)
        output_queue.put(finished)
        work_queue.task_done()


def producer(input_queue, work_queue):
    for url in input_queue:
        url_list = crawl(url)
        for work in url_list:
            work_queue.put(work)
        input_queue.task_done()


def do_work(work):
    gevent.sleep(0)  # Actually proces link here
    return work


def crawl(url):
    gevent.sleep(0)
    return list(url)  # Actually process url here

input = JoinableQueue()
work = JoinableQueue()
output = Queue()

workers = [gevent.spawn(worker, work, output) for i in range(0, 10)]
producers = [gevent.spawn(producer, input, work) for i in range(0, 10)]


list_of_urls = ['foo', 'bar']

for url in list_of_urls:
    input.put(url)

# Wait for input to finish processing
input.join()
print 'finished producing'
# Wait for workers to finish processing work
work.join()
print 'finished working'

# We now have output!
print 'output:'
for message in output:
    print message
# Or if you'd like, you could use the output as it comes!

入力キューと作業キューが終了するのを待つ必要はありません。ここでそれを示しました。

于 2012-02-12T19:22:26.153 に答える