0

私は RQ が初めてで、フラスコ アプリ内に実装しようとしています。私のルートの 1 つの主な目標は、データベースから値を更新することです。ワーカーをセットアップするには、次を使用しています。

from rq import Worker, Queue, Connection
import redis
import os

@app.before_first_request
def start_worker():
    def runworker():
        redis_url = os.environ.get("REDIS_URL") or 'redis://'
        conn = redis.from_url(redis_url)
        with Connection(conn):
            worker = Worker(list(map(Queue, listen)))
            worker.work()
    tp = ThreadPoolExecutor()
    tp.submit(runworker)


def get_redis_connection():
    redis_connection = getattr(g, '_redis_connection', None)
    if redis_connection is None:
        redis_url = os.environ.get('REDIS_URL') or 'redis://'
        redis_connection = redis.from_url(redis_url)
    return redis_connection


@app.before_request
def push_rq_connection():
    push_connection(get_redis_connection())


@app.teardown_request
def pop_rq_connection(exception=None):
    pop_connection()

次に、更新ルートは更新ジョブをキューに入れます

@app.route('/update')
def update_db():
    q = Queue(connection=conn)
    job = q.q.enqueue('app.tasks.update_task', parameters)
    job_id = job.get_id()
    return {"job_id": job_id}, 201, {"Content-Type": 'application/json'}

最後に、ワーカーは更新関数を実行します

def update_task(parameters):
    # script to update DB

update_task 関数を次のような単純なものに切り替えると、ワーカーのセットアップが (ほぼ) 機能することがわかります。

def update_task(seconds):
    for i in range(seconds):
        print(i)
        time.sleep(1)
    return "Hello world"

できます。ただし、実際の関数では、環境変数が定義されていないという問題に遭遇し続けます。これは、実際の更新を実行すると、変数が None または同様のものであるという例外が発生するためです。

RQ内で環境変数を処理する方法を知っている人はいますか? 構成ファイルのような場所で再度宣言する必要がありますか?

4

1 に答える 1