10

Python RQを使用して、ワーカープロセスを動的に管理しようとしています。カスタムビルドのワーカースクリプトを使用します。これは(簡略化された形式で)次のとおりです。

from rq import Connection, Worker

queues_to_listen_on = get_queues_to_listen_on()

with Connection(connection = get_worker_connection()):
    w = Worker(queues_to_listen_on)
    w.work()

特に労働者の閉鎖に関心があります。私たちが抱えている主な懸念は、シャットダウンする前に現在の作業を終了できるように、ワーカーを正常にシャットダウンする方法です。適切なオブジェクトのrequest_stop(...)シグナルハンドラーは必要なことを実行しているように見えますが、ターミナルで実行されているワーカープロセスWorkerを押す以外に、(少なくとも私の知る限りでは)それを発行する方法はないようです。CTRL+C

私が見ているように、2つの考えられる解決策があります(間違いなくもっとあるかもしれません)-好みの順に:

  1. プログラムで、rqライブラリを使用して、シグナルをに送信し、request_stop正常なシャットダウンをトリガーします。
  2. どういうわけか正しいプロセスのpidを取得し(主力プロセスかワーカーリスナープロセスかわからない)、他の方法を使用して、適切なシグナルをそのプロセスに送信します。これを行うにはいくつかの方法がありますが、おそらくより多くの作業が必要であり、私が除外したい問題に他の変数を導入します(たとえば、Fabricリモートコマンドなどをそれらの行に沿って実行するために使用します)。

この問題を解決するためのより良い方法、または同じ目標を達成する別の代替方法がある場合は、あなたの提案をいただければ幸いです。

4

1 に答える 1

5

オプション1は、設計の点で間違いなく優れています。

ただしCTRL + C、プロセスを終了するために使用する必要があるという特定の問題を解決するために(私もそれが嫌いです)、ワーカーに対して次の戦略を使用できます。

# WORKER_NAME.py
import os

PID = os.getpid()

@atexit.register
def clean_shut():
    print "Clean shut performed"

    try:
        os.unlink("WORKER_NAME.%d" % PID)
    except:
        pass

# Worker main
def main():
    f = open("WORKER_NAME.%d" % PID, "w")
    f.write("Delete this to end WORKER_NAME gracefully")
    f.close()

    while os.path.exists("WORKER_NAME.%d" % PID):
        # Worker working

そして、マスタースクリプトで、@ Borysが提案したようにワーカーPIDを取得し、ウォームストップ要求を送信しos.unlink("path/to/WORKER_NAME.%d" % worker_PID)て、正常なシャットダウンを確保します:)

ただし、これは不定ループを実行しているワーカーにのみ適用されます。ワーカープロセスがプレーンシーケンシャルワンタイムジョブでさえブロックするものを呼び出す場合、ある種のタイムアウト戦略を適用するなど、そこから解決するために、ブロックしている可能性のあるルーチンまでさらにトレースする必要があります。

于 2013-03-01T15:09:06.540 に答える