私は RQ が初めてで、フラスコ アプリ内に実装しようとしています。私のルートの 1 つの主な目標は、データベースから値を更新することです。ワーカーをセットアップするには、次を使用しています。
from rq import Worker, Queue, Connection
import redis
import os
@app.before_first_request
def start_worker():
def runworker():
redis_url = os.environ.get("REDIS_URL") or 'redis://'
conn = redis.from_url(redis_url)
with Connection(conn):
worker = Worker(list(map(Queue, listen)))
worker.work()
tp = ThreadPoolExecutor()
tp.submit(runworker)
def get_redis_connection():
redis_connection = getattr(g, '_redis_connection', None)
if redis_connection is None:
redis_url = os.environ.get('REDIS_URL') or 'redis://'
redis_connection = redis.from_url(redis_url)
return redis_connection
@app.before_request
def push_rq_connection():
push_connection(get_redis_connection())
@app.teardown_request
def pop_rq_connection(exception=None):
pop_connection()
次に、更新ルートは更新ジョブをキューに入れます
@app.route('/update')
def update_db():
q = Queue(connection=conn)
job = q.q.enqueue('app.tasks.update_task', parameters)
job_id = job.get_id()
return {"job_id": job_id}, 201, {"Content-Type": 'application/json'}
最後に、ワーカーは更新関数を実行します
def update_task(parameters):
# script to update DB
update_task 関数を次のような単純なものに切り替えると、ワーカーのセットアップが (ほぼ) 機能することがわかります。
def update_task(seconds):
for i in range(seconds):
print(i)
time.sleep(1)
return "Hello world"
できます。ただし、実際の関数では、環境変数が定義されていないという問題に遭遇し続けます。これは、実際の更新を実行すると、変数が None または同様のものであるという例外が発生するためです。
RQ内で環境変数を処理する方法を知っている人はいますか? 構成ファイルのような場所で再度宣言する必要がありますか?