1

私のdjangoプロジェクトには、セロリのタスクを実行するビューがあります。セロリタスク自体がサブプロセス/ファブリックを介していくつかのmap/reduceジョブをトリガーし、hadoopジョブの結果がディスクに保存されます---データベースには実際には何も保存されません。Hadoopジョブが完了すると、セロリタスクは次のように完了したことを示すdjangoシグナルを送信します。

# tasks.py
from models import MyModel
import signals

from fabric.operations import local

from celery.task import Task

class Hadoopification(Task):
    def run(self, my_model_id, other_args):
        my_model = MyModel.objects.get(pk=my_model_id)
        self.hadoopify_function(my_model, other_args)
        signals.complete_signal.send(
            sender=self,
            my_model_id=my_model_id,
            complete=True,
        )

    def hadoopify_function(self, my_model, other_args):
        local("""hadoop jar /usr/lib/hadoop/hadoop-streaming.jar -D mapred.reduce.tasks=0 -file hadoopify.py -mapper "parse_mapper.py 0 0" -input /user/me/input.csv -output /user/me/output.csv""")

本当に困惑しているのは、セロリタスクの実行時にdjango runserverがリロードされていることです。まるで、djangoプロジェクトのどこかでコードを変更したかのようです(まだ変更していません)。時々、これはrunserverコマンドでエラーを引き起こし、runserverコマンドがリロードして再びOKになる前に次のような出力が表示されます(注:このエラーメッセージはここで説明する問題と非常によく似ています)。

Unhandled exception in thread started by <function inner_run at 0xa18cd14>
Error in sys.excepthook:
Traceback (most recent call last):
  File "/usr/lib/python2.6/dist-packages/apport_python_hook.py", line 48, in apport_excepthook
    if not enabled():
TypeError: 'NoneType' object is not callable

Original exception was:
Traceback (most recent call last):
  File "/home/rdm/Biz/Projects/Daegis/Server_Development/tar/env/lib/python2.6/site-packages/django/core/management/commands/runserver.py", line 60, in inner_run
    run(addr, int(port), handler)
  File "/home/rdm/Biz/Projects/Daegis/Server_Development/tar/env/lib/python2.6/site-packages/django/core/servers/basehttp.py", line 721, in run
    httpd.serve_forever()
  File "/usr/lib/python2.6/SocketServer.py", line 224, in serve_forever
    r, w, e = select.select([self], [], [], poll_interval)
AttributeError: 'NoneType' object has no attribute 'select'

local("""hadoop ...""")問題を、django runserverのリロードで問題が発生しlocal("ls")ないものに置き換えることで、hadoopへの呼び出しが行われる場合に絞り込みました。hadoopコードにバグはありません---セロリによって呼び出されない場合は、それ自体で問題なく動作します。

何がこれを引き起こしているのかについて何か考えはありますか?

4

3 に答える 3

2

そのため、ファブリックのソースコードを調べた後、fabric.operations.localコマンド内で実行されるセロリタスクが失敗したため(hadoop出力puke-fest内で検出するのが難しい)、djangoがリロードされていることがわかりました。fabric.operations.localコマンドが失敗すると、fabricはsys.exitを発行します。これにより、セロリが停止し、djangoがリロードを試みます。このエラーは、次のようなHadoopタスク内でSystemExitをキャッチすることで検出できます。

class Hadoopification(Task):
    def run(self, my_model_id, other_args):
        my_model = MyModel.objects.get(pk=my_model_id)
        self.hadoopify_function(my_model, other_args)
        signals.complete_signal.send(
            sender=self,
            my_model_id=my_model_id,
            complete=True,
        )

    def hadoopify_function(self, my_model, other_args):
        try:
            local("""hadoop jar /usr/lib/hadoop/hadoop-streaming.jar -D mapred.reduce.tasks=0 -file hadoopify.py -mapper "parse_mapper.py 0 0" -input /user/me/input.csv -output /user/me/output.csv""")
        except SystemExit, e:
            # print some useful debugging information about exception e here!
            raise
于 2012-06-08T14:31:52.137 に答える
1

これについては、ファブリックのgithubページ、ここここここでいくつかの議論があります。エラーを発生させる別のオプションは、設定コンテキストマネージャーを使用することです。

from fabric.api import settings

class Hadoopification(Task):
    ...
    def hadoopify_function(self, my_model, other_args):
        with settings(warn_only=True):
            result = local(...)
        if result.failed:
            # access result.return_code, result.stdout, result.stderr
            raise UsefulException(...)

これには、結果の戻りコードと他のすべての属性へのアクセスを許可するという利点があります。

于 2012-06-09T16:55:25.940 に答える
0

私の推測では、セロリとファブリックの両方でタスクの名前に衝突があると思います。次のようなものを使用することをお勧めします。

import celery
class Hadoopification(celery.task.Task):
    ...

そして、その予感が良ければ、それ以上の衝突を避けてください。

しかし実際には、ファブリックのローカルはかなり素晴らしく、本質的には単なるサブプロセスです。Popenは、rawを呼び出して、pythonstdlib以外のものを分離することもできます。

于 2012-06-02T02:21:20.407 に答える