0

作業サイクル中にpython-gearmanワーカーで使用可能なタスクを変更しようとしています。これを行う理由は、ワーカープロセスを少し制御し、データベースからリロードできるようにするためです。すべてのワーカーを定期的にリロードする必要がありますが、単にプロセスを強制終了したくはありません。また、サービスを常に利用できるようにしたいので、バッチでリロードする必要があります。したがって、4人のワーカーをリロードし、別の4人のワーカーを処理できるようにしてから、次の4人のワーカーをリロードします。

プロセス:

  1. リロードプロセスを4回開始します。
    1. reloadプロセスの登録を解除します
    2. データセットをリロードする
    3. finishReloadタスクを登録する
    4. 戻る
  2. reloadタスクが登録されているワーカーがなくなるまで、手順1を繰り返します。
  3. finishReload使用可能なタスクを持つワーカーがなくなるまで、(1)タスクを開始しfinishReloadます。

(1)finishReloadタスクは、タスクの登録を解除し、finishReloadタスクを登録してreloadから戻ります。

今、私が遭遇している問題は、ワーカープロセスで使用可能なタスクを変更するとジョブが失敗することです。エラーメッセージや例外はなく、ギアマンドログの「エラー」だけです。これが問題を再現する簡単なプログラムです。

ワーカー

import gearman 
def reversify(gmWorker, gmJob): 
        return "".join(gmJob.data[::-1]) 
def strcount(gmWorker, gmJob): 
        gmWorker.unregister_task('reversify')  # problem line 
        return str(len(gmJob.data)) 
worker = gearman.GearmanWorker(['localhost:4730']) 
worker.register_task('reversify', reversify) 
worker.register_task('strcount', strcount) 
while True: 
        worker.work() 

クライアント

import gearman 
client = gearman.GearmanClient(['localhost:4730']) 
a = client.submit_job('reversify', 'spam and eggs') 
print a.result 
>>> sgge dna maps 

a = client.submit_job('strcount', 'spam and eggs') 
...

私が解明できることがあれば教えてください。

編集:誰かが私が言及したログを見るように頼むことを私は知っています。この質問をGoogleのギアマングループにも投稿しました。ログはそこで利用できます

4

2 に答える 2

1

GearmanWorker クラスをサブクラス化し、いくつかのフラグを追加すると、この問題を回避できるようです。ワーカーからサーバーに新しいコマンドを発行する前に、ジョブを完了させる必要があります。これにより、現在のジョブが中断されているように見えます。そのため、関数を上書きするon_job_completeと、有効化/無効化フラグを確認し、send_job_completeコマンドを発行した後にそれらに対処できます。新しいワーカー プログラムは次のとおりです。

ワーカー

import gearman

def reversify(gmWorker, gmJob):
        return "".join(gmJob.data[::-1])

def enable_reversify(gmWorker, gmJob):
        myWorker.enableReversify = 1
        return 'OK'

def strcount(gmWorker, gmJob):
        myWorker.enableReversify = -1
        return str(len(gmJob.data))

class myWorker(gearman.GearmanWorker):

        enableReversify = 0 # 0 = do nothing, -1 = turn off, 1 = turn on

        def on_job_complete(self, current_job, job_result):
                self.send_job_complete(current_job, job_result)
                ### check the flag here and enable or disable tasks ###
                if myWorker.enableReversify == -1:
                        self.unregister_task('reversify')
                if myWorker.enableReversify == 1:
                        self.register_task('reversify', reversify)
                myWorker.enableReversify = 0 # reset the flag
                return True

worker = myWorker(['localhost:4730']) 
worker.register_task('reversify', reversify)
worker.register_task('strcount', strcount)
worker.register_task('enableReversify', enable_reversify)

while True:
        worker.work() 
于 2011-03-21T15:46:50.483 に答える
0

一見すると、問題は、ジョブを開始していて、そのジョブが完了する前に、そのジョブを実行するワーカーの能力をジョブ サーバーから登録解除していることにあるように見えます。

于 2011-03-21T01:56:25.543 に答える