11

ftpでファイルをダウンロードし、変更して、アップロードし直す必要があります。私はこれを行うためにセロリを使用していますが、チェーンを使用しようとすると問題が発生します。

TypeError:upload_ftp_image()は正確に5つの引数を取ります(6つ指定)

また、チェーンを使用して、ステップが順番に実行されることを保証できますか?そうでない場合、代替手段は何ですか?

res = chain(download_ftp_image.s(server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/"), upload_ftp_image.s(server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/")).apply_async()
print res.get()

タスク:

@task()
def download_ftp_image(ftp_server, username , password , filename, directory):
    try:
        ftp = FTP(ftp_server)
        ftp.login(username, password)
        if not os.path.exists(directory):
            os.makedirs(directory)
            ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
        else:
            ftp.retrbinary("RETR /default_app/model.dae" , open(directory + 'model.dae', 'wb').write)
        ftp.quit()
    except error_perm, resp:
        raise download_ftp_image.retry(countdown=15)

    return "SUCCESS: "  

@task()
def upload_ftp_image(ftp_server, username , password , file , directory):
    try:
        ftp = FTP(ftp_server)
        ftp.login(username, password)
        new_file= file.replace(directory, "")
        directory = directory.replace("tmp","")
        try:
            ftp.storbinary("STOR " + directory + new_file , open(file, "rb"))
        except:
            ftp.mkd(directory)
            ftp.storbinary("STOR " + directory + new_file, open(file, "rb"))
        ftp.quit()
    except error_perm, resp:
        raise upload_ftp_image.retry(countdown=15)

    return "SUCCESS: "

これは私の特定のケースにとって良い習慣ですか、それとも悪い習慣ですか?:

result = download_ftp_image.apply_async((server, username , password, "/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
result.get()
result = upload_ftp_image.apply_async((server, username , password, "tmp/test_app_2/model.dae" ,"tmp/test_app_2/",), queue='rep_data')
#result.get()
4

2 に答える 2

25

前のタスクの戻り値を引数として使用したくない場合の別のオプションは、「不変性」を使用することです。

http://docs.celeryproject.org/en/latest/userguide/canvas.html#immutability

サブタスクを次のように定義する代わりに:

download_ftp_image.s(...) and upload_ftp_image.s(...)

それらを次のように定義します。

download_ftp_image.si(...) and upload_ftp_image.si(...)

これで、チェーン内の通常の数の引数でタスクを使用できるようになりました。

于 2013-03-06T08:55:01.433 に答える
17

チェーンには、常にの結果が最初の引数として渡されます。チェーンのドキュメントから:

リンクされたタスクは、親タスクの結果を最初の引数として適用されます。上記の場合mul(4, 16)、結果は4であるため結果になります。

あなたのupload_ftp_imageタスクはこの余分な引数を受け入れないため、失敗します。

連鎖の優れたユースケースがあります。2番目のタスクは、最初のタスクが完了したに呼び出されることが保証されています(そうでない場合、結果を渡すことができませんでした)。

前のタスクの結果の引数を追加するだけです。

def upload_ftp_image(download_result, ftp_server, username , password , file , directory):

その結果値を利用することができます。たぶん、ダウンロードメソッドがダウンロードされたファイルのパスを返すようにして、アップロードメソッドが何をアップロードするかを認識できるようにしますか?

于 2013-03-05T13:03:03.373 に答える