7

私は小さいながらも計算量の多いPythonアプリに取り組んでいます。計算量の多い作業は、同時に実行できるいくつかの部分に分割できます。私はこれを達成するために適切なスタックを特定しようとしています。

現在、タスクキューにCeleryを使用してApache2+WSGIでFlaskアプリを使用することを計画しています。

以下ではa_long_process()、3人以上のワーカーが利用可能な場合、、、を同時に実行しますかanother_long_process()yet_another_long_process()プロセスの実行中にFlaskアプリはブロックされますか?

Flaskアプリから:

@myapp.route('/foo')
def bar():
    task_1 = a_long_process.delay(x, y)
    task_1_result = task_1.get(timeout=1)
    task_2 = another_long_process.delay(x, y)
    task_2_result = task_2.get(timeout=1)
    task_3 = yet_another_long_process.delay(x, y)
    task_3_result = task_3.get(timeout=1)
    return task_1 + task_2 + task_3

tasks.py:

from celery import Celery
celery = Celery('tasks', broker="amqp://guest@localhost//", backend="amqp://")
@celery.task
def a_long_process(x, y):
    return something
@celery.task
def another_long_process(x, y):
    return something_else
@celery.task
def yet_another_long_process(x, y):
    return a_third_thing
4

3 に答える 3

7

ワーカーが並行して作業できるように、コードを変更する必要があります。

@myapp.route('/foo')
def bar():
    # start tasks
    task_1 = a_long_process.delay(x, y)
    task_2 = another_long_process.delay(x, y)
    task_3 = yet_another_long_process.delay(x, y)
    # fetch results
    try:
        task_1_result = task_1.get(timeout=1)
        task_2_result = task_2.get(timeout=1)
        task_3_result = task_3.get(timeout=1)
    except TimeoutError:
        # Handle this or don't specify a timeout.
        raise
    # combine results
    return task_1 + task_2 + task_3

このコードは、すべての結果が利用可能になるまで(またはタイムアウトに達するまで)ブロックされます。

プロセスの実行中にFlaskアプリはブロックされますか?

このコードは、WSGIコンテナの1つのワーカーのみをブロックします。サイト全体が応答しないかどうかは、使用しているWSGIコンテナによって異なります。(例:Apache + mod_wsgi、uWSGI、gunicornなど)ほとんどのWSGIコンテナーは複数のワーカーを生成するため、コードがタスクの結果を待機している間、1つのワーカーのみがブロックされます。

この種のアプリケーションでは、リクエストごとに個別のグリーンレットを生成し、非常に軽量なgeventを使用することをお勧めします。

于 2013-01-29T17:28:04.403 に答える
1

のドキュメントによるとresult.get()、結果が返されるまで待機するため、通常は実際にはブロックされています。ただし、があるので、タスクの完了に1秒以上かかる場合、timeout=1を呼び出すとget()TimeoutErrorが発生します。

デフォルトでは、Celeryワーカーは、使用可能なCPUの数に等しい同時実行レベルを設定して開始します。同時実行レベルは、タスクの処理に使用できるスレッドの数を決定するようです。したがって、並行性レベルが3以上の場合、Celeryワーカーは、その多くのタスクを同時に処理できるはずです(おそらく、Celeryの専門知識が豊富な人がこれを確認できますか?)。

于 2013-01-29T17:19:32.117 に答える
0

セロリキャンバスのグループ機能を使用します。

グループプリミティブは、並行して適用する必要があるタスクのリストを取得するシグニチャです。

ドキュメントで提供されている例は次のとおりです。

from celery import group
from proj.tasks import add

g = group(add.s(2, 2), add.s(4, 4))
res = g()
res.get()

どの出力[4, 8]

于 2014-07-17T12:40:04.213 に答える