あなたinspect.reserved()/scheduled()
が言及したことは機能するかもしれませんが、ワーカーがプリフェッチしたタスクのみを考慮に入れるため、常に正確であるとは限りません。
Celery では、キューからのメッセージの削除や並べ替えなど、キューでの帯域外操作は許可されません。これは、分散システムではスケーリングされないためです。メッセージはまだキューに到達していない可能性があり、競合状態が発生する可能性があり、実際にはトランザクション操作を伴うシーケンシャル キューではなく、複数の場所から発信されたメッセージのストリームです。つまり、Celery API は厳密なメッセージ パッシング セマンティクスに基づいています。
Celery がサポートする一部のブローカー (Redis やデータベースなど) でキューに直接アクセスすることは可能ですが、それは公開 API の一部ではありません。大規模な操作を行う場合は、自分にとって最も便利なことを何でもして、私のアドバイスを破棄する必要があります。
これが、ジョブがいつ完了するかをユーザーに知らせるためのものである場合、キューの長さと時間があれば、タスクがいつ実行されるかを予測するアルゴリズムを考え出すことができると確信しています。各タスクが挿入された場所。
前者はただの で、後者はシグナルredis.len("celery")
を聞いて自分自身を追加できます:task_sent
from celery.signals import task_sent
@task_sent.connect
def record_insertion_time(id, **kwargs):
redis.zadd("celery.insertion_times", id)
ここでソートされたセットを使用する: http://redis.io/commands/zadd
純粋なメッセージ パッシング ソリューションの場合、Celery イベント ストリームを消費し、タスクがいつ終了するかを予測する専用のモニターを使用できます。
http://docs.celeryproject.org/en/latest/userguide/monitoring.html#event-reference
(タスク送信にドキュメントのタイムスタンプ フィールドが欠落していることに気付きましたが、タイムスタンプはそのイベントと共に送信されるため、修正します)。
イベントには、論理クロックである「クロック」フィールドも含まれます ( http://en.wikipedia.org/wiki/Lamport_timestampsを参照)。これは、システムに依存せずに分散システムでイベントの順序を検出するために使用できます。各マシンの時間を同期する必要があります (これを達成するのは不可能です)。