0

Taskerクラスは、インスタンス化時に初期ジョブをセットアップします。基本的に私が望むのは、ジョブを「main_queue」に入れ、ジョブが実行されているかどうか、または「process_queue」にキューに入れられている同じジョブが既に存在するかどうかを判断し、現在の「main_queue」ジョブから戻ることです。それ以外の場合は、「process_queue」にジョブをキューに入れます。そのプロセス キューが終了したら、ジョブを「main_queue」に入れます。

ただし、「process_queue」には、出力の確認が終了しているはずであるにもかかわらず、その期間の id を持つ同じジョブがあります。したがって、新しいジョブが処理されることはありません。見ることができないデッドロックが発生していますか?

main_queue ワーカー

$ rq worker main_queue --with-scheduler
22:44:19 Worker rq:worker:7fe23a24ae404135a10e301f7509eb7e: started, version 1.9.0
22:44:19 Subscribing to channel rq:pubsub:7fe23a24ae404135a10e301f7509eb7e
22:44:19 *** Listening on main_queue...
22:44:19 Trying to acquire locks for main_queue
22:44:19 Scheduler for main_queue started with PID 3747
22:44:19 Cleaning registries for queue: main_queue
22:44:33 main_queue: tasks.redis_test_job() (e90e0dff-bbcc-48ab-afed-6d1ba8b020a8)
None
Job is enqueued to process_queue!
22:44:33 main_queue: Job OK (e90e0dff-bbcc-48ab-afed-6d1ba8b020a8)
22:44:33 Result is kept for 500 seconds
22:44:47 main_queue: tasks.redis_test_job() (1a7f91d0-73f4-466e-92f4-9f918a9dd1e9)
<Job test_job: tasks.print_job()>
!!Scheduler added job to main but same job is already queued in process_queue!!
22:44:47 main_queue: Job OK (1a7f91d0-73f4-466e-92f4-9f918a9dd1e9)
22:44:47 Result is kept for 500 seconds

process_queue ワーカー

$ rq worker process_queue
22:44:24 Worker rq:worker:d70daf20ff324c18bc17f0ea9576df52: started, version 1.9.0
22:44:24 Subscribing to channel rq:pubsub:d70daf20ff324c18bc17f0ea9576df52
22:44:24 *** Listening on process_queue...
22:44:24 Cleaning registries for queue: process_queue
22:44:33 process_queue: tasks.print_job() (test_job)
The process job executed.
22:44:42 process_queue: Job OK (test_job)
22:44:42 Result is kept for 500 seconds

tasker.py

class Tasker():
    def __init__(self):
        self.tasker_conn = RedisClient().conn
        self.process_queue = Queue(name='process_queue', connection=Redis(), 
                                   default_timeout=-1)
        self.main_queue = Queue(name='main_queue', connection=Redis(),
                                    default_timeout=-1)
        self.__setup_tasks()
    def __setup_tasks(self):
        self.main_queue.enqueue_in(timedelta(seconds=3), tasks.redis_test_job)

タスク.py

import tasks
def redis_test_job():
    q = Queue('process_queue', connection=Redis(), default_timeout=-1)
    queued = q.fetch_job('test_job')
    print(queued)
    if queued:
        print("!!Scheduler added job to main but same job is already queued in process_queue!!")
        return False
    else:
        q.enqueue(tasks.print_job, job_id='test_job')
        print("Job is enqueued to process_queue!")
    return True

def print_job():
    sleep(8)
    print("The process job executed.")
    q = Queue('main_queue', connection=Redis(), default_timeout=-1)
    q.enqueue_in(timedelta(seconds=5), tasks.redis_test_job)
4

1 に答える 1

1

docsから、キューに入れられたジョブには、定義しない場合result_ttl、デフォルトで500 秒の があります。

たとえば、ジョブと結果を 1 秒間だけライブにするように変更する場合は、次のようにジョブをキューに入れます。

 q.enqueue(tasks.print_job, job_id='test_job', result_ttl=1)
于 2021-07-05T20:08:19.673 に答える