RQ / Redis を使用して、django サイトで実行時間の長いジョブの非同期実行を構築し始めました。私は次のようなことをしたいと思っています:
モデルのインスタンスごとに 1 つのキューが必要です。このモデルは、API ユーザー アカウントのようなものと考えることができます (多くはありません。多くても 15 から 20 です)。
タスクのバッチ (10 ~ 500 の範囲) をキュー全体に均等に分散します。最初のバッチが完了する前に、複数のバッチを追加できます。
バッチごとに、アクティブに処理されていないキューごとにワーカーを起動したいと思います。これらのワーカーをバッチ モードで実行して、タスクがなくなるとシャットダウンするようにします。
それらをバッチモードで実行することはできず、常にすべてのキューで作業を行ったり、作業をリッスンしたりしていました。これに関する問題は、キューを動的に追加および削除できるようにしたいため、バッチごとに使用可能なキューを起動する方がよいことです。
タスクを複数のキューに分散しているのは奇妙に思えるかもしれませんが、その理由は、同じキュー内の各タスクを、使用しているサービスに応じてレート制限/調整する必要があるためです (API と考えてください)。レート制限がありますが、各キューは異なるアカウントを表します)。しかし、私の目的では、タスクが実行されているアカウントに違いはないので、すべてのアカウントで並列化することもできます。
私が直面している問題は、ワーカーを開始して既に処理中のキューを指定すると、そのキューで 2 つのワーカーが独立して動作するようになり、予想されるスロットリング率が半分になることです。そのキューで動作しているワーカーがまだない場合にのみワーカーを開始するにはどうすればよいですか? おそらくこれに対するハックな解決策を見つけることができますが、「正しい」方法で処理したいと思います。キューの経験があまりないので、質問する必要があると思いました。
キューを動的に制御できるように、独自のワーカー クラスを既に実装しているため、そのキューが既に処理されている場合に新しいワーカーが与えられないロジックを追加する方法が必要です。私のワーカーの単純なバージョンは次のとおりです。
# custom_worker.py
import sys
from Api.models import *
from rq import Queue, Connection, Worker
# importing the necessary namespace for the tasks to run
from tasks import *
# dynamically getting the queue names in which I am expecting tasks
queues = [user.name for user in ApiUser.objects.all()]
with Connection():
qs = list(map(Queue, queues)) or [Queue()]
w = Worker(qs)
w.work(burst=True)