5

~400 の HTTP GET リクエストを送信して結果を収集しようとしています。私はジャンゴから走っています。私の解決策は、geventでセロリを使用することでした。

セロリ タスクを開始するには、get_reportsを呼び出します。

def get_reports(self, clients, *args, **kw):
    sub_tasks = []
    for client in clients:  
            s = self.get_report_task.s(self, client, *args, **kw).set(queue='io_bound')
        sub_tasks.append(s)
    res = celery.group(*sub_tasks)()
    reports = res.get(timeout=30, interval=0.001)
    return reports

@celery.task
def get_report_task(self, client, *args, **kw):
    report = send_http_request(...)
    return report

私は4人の労働者を使います:

manage celery worker -P gevent --concurrency=100 -n a0 -Q io_bound
manage celery worker -P gevent --concurrency=100 -n a1 -Q io_bound
manage celery worker -P gevent --concurrency=100 -n a2 -Q io_bound
manage celery worker -P gevent --concurrency=100 -n a3 -Q io_bound

そして、RabbitMq をブローカーとして使用します。

そして、リクエストを順次実行するよりもはるかに高速に動作しますが (400 リクエストに約 23 秒かかりました)、その時間のほとんどがセロリ自体のオーバーヘッドであることに気付きました。

@celery.task
def get_report_task(self, client, *args, **kw):
    return []

この操作全体に約 19 秒かかりました。つまり、すべてのタスクをセロリに送信して結果を返すだけで 19 秒を費やしています。

rabbit mq へのメッセージのキューイング レートは 28 メッセージ/秒に制限されているようで、これが私のボトルネックだと思います。

それが問題になる場合、私はwin 8マシンで実行しています。

私が試したことのいくつか:

  • redis をブローカーとして使用する
  • 結果のバックエンドとして redis を使用する
  • それらの設定を微調整する

    BROKER_POOL_LIMIT = 500

    CELERYD_PREFETCH_MULTIPLIER = 0

    CELERYD_MAX_TASKS_PER_CHILD = 100

    CELERY_ACKS_LATE = 偽

    CELERY_DISABLE_RATE_LIMITS = True

スピードアップに役立つ提案を探しています。

4

3 に答える 3

1

代わりにtwistedを使用するのはどうですか?はるかに単純なアプリケーション構造に到達できます。django プロセスから 400 件のリクエストすべてを一度に送信し、すべてが完了するのを待つことができます。これは、twisted がソケットを非ブロック モードに設定し、利用可能な場合にのみデータを読み取るため、同時に機能します。

少し前に同様の問題があり、ツイストとジャンゴの間の素晴らしいブリッジを開発しました。私はほぼ1年間、本番環境で実行しています。ここで見つけることができます: https://github.com/kowalski/featdjango/。簡単に言えば、メインのツイスト リアクター ループを実行するメイン アプリケーション スレッドがあり、django ビューの結果はスレッドに委譲されます。これは、reactor と対話し、その非同期機能を使用するメソッドを公開する特別なスレッドプールを使用します。

これを使用すると、コードは次のようになります。

from twisted.internet import defer
from twisted.web.client import getPage

import threading


def get_reports(self, urls, *args, **kw):
    ct = threading.current_thread()

    defers = list()
    for url in urls:
        # here the Deferred is created which will fire when
        # the call is complete
        d = ct.call_async(getPage, args=[url] + args, kwargs=kw)
        # here we keep it for reference
        defers.append(d)

    # here we create a Deferred which will fire when all the
    # consiting Deferreds are completed
    deferred_list = defer.DeferredList(defers, consumeErrors=True)
    # here we tell the current thread to wait until we are done
    results = ct.wait_for_defer(deferred_list)

    # the results is a list of the form (C{bool} success flag, result)
    # below unpack it
    reports = list()
    for success, result in results:
        if success:
            reports.append(result)
        else:
            # here handle the failure, or just ignore
            pass

    return reports

これはまだ多くを最適化できるものです。ここでは、getPage() を呼び出すたびに個別の TCP 接続が作成され、完了したら閉じられます。400 個のリクエストがそれぞれ別のホストに送信される場合、これは可能な限り最適です。そうでない場合は、永続的な接続と http パイプラインを使用する http 接続プールを使用できます。次のようにインスタンス化します。

from feat.web import httpclient

pool = httpclient.ConnectionPool(host, port, maximum_connections=3)

単一のリクエストは次のように実行されます (代わりに getPage() 呼び出しが実行されます)。

d = ct.call_async(pool.request, args=(method, path, headers, body))
于 2013-09-17T08:59:50.637 に答える