15

私の初期ファイルはAWS S3. 誰かがこれをどのように設定する必要があるか教えてもらえますLuigi Taskか?

ドキュメントを確認して見つけましluigi.S3たが、それをどうするかが明確ではありません。次に、Webで検索して、mortar-luigiルイージの上部からのリンクと実装のみを取得しました。

アップデート

@matagus に提供された例に従った後 (私~/.botoも提案どおりにファイルを作成しました):

# coding: utf-8

import luigi

from luigi.s3 import S3Target, S3Client

class MyS3File(luigi.ExternalTask):
    def output(self):
        return S3Target('s3://my-bucket/19170205.txt')

class ProcessS3File(luigi.Task):

    def requieres(self):
        return MyS3File()

    def output(self):
        return luigi.LocalTarget('/tmp/resultado.txt')

    def run(self):
        result = None

        for input in self.input():
           print("Doing something ...")
           with input.open('r') as f:
               for line in f:
                   result = 'This is a line'

        if result:
            out_file = self.output().open('w')
            out_file.write(result)

実行しても何も起こらない

DEBUG: Checking if ProcessS3File() is complete
INFO: Informed scheduler that task   ProcessS3File()   has status   PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) running   ProcessS3File()
INFO: [pid 21171] Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) done      ProcessS3File()
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task   ProcessS3File()   has status   DONE
DEBUG: Asking scheduler for work...
INFO: Done
INFO: There are no more tasks to run at this time
INFO: Worker Worker(salt=226574718, workers=1, host=heliodromus, username=nanounanue, pid=21171) was stopped. Shutting down Keep-Alive thread

ご覧のとおり、メッセージDoing something...は印刷されません。なにが問題ですか?

4

1 に答える 1

21

ここで重要なのは、入力を持たず、S3 に既にあるファイルを出力する外部タスクを定義することです。ルイージのドキュメントでは、別のタスクの要求でこれについて言及しています。

requires() は Target オブジェクトを返すことができないことに注意してください。外部で作成された単純な Target オブジェクトがある場合は、それを Task クラスでラップできます

したがって、基本的には次のようになります。

import luigi

from luigi.s3 import S3Target

from somewhere import do_something_with


class MyS3File(luigi.ExternalTask):

    def output(self):
        return luigi.S3Target('s3://my-bucket/path/to/file')

class ProcessS3File(luigi.Task):

    def requires(self):
        return MyS3File()

    def output(self):
        return luigi.S3Target('s3://my-bucket/path/to/output-file')

    def run(self):
        result = None
        # this will return a file stream that reads the file from your aws s3 bucket
        with self.input().open('r') as f:
            result = do_something_with(f)

        # and the you 
        out_file = self.output().open('w')
        # it'd better to serialize this result before writing it to a file, but this is a pretty simple example
        out_file.write(result)

アップデート:

Luigi はbotoを使用して AWS S3 からファイルを読み取ったり、AWS S3 にファイルを書き込んだりするため、このコードを機能させるには、boto 構成ファイルで資格情報を提供する必要があります~/boto(他の可能な構成ファイルの場所をここで探してください)。

[Credentials]
aws_access_key_id = <your_access_key_here>
aws_secret_access_key = <your_secret_key_here>
于 2015-10-26T17:08:45.243 に答える