2

Google の App Engine の log2bq の例をモデルにしたマッパー パイプラインがあります。

内訳:

  1. 処理するファイルを Blobstore にアップロードします
  2. Mapper パイプラインを使用してファイルを反復処理し、ファイルを処理し、出力ライターを使用して結果を CSV 形式で Google ストレージに書き込みます
  3. 次に、BigQuery ジョブを送信して、新しく作成されたファイルを使用してテーブルに入力します
  4. テーブルにクエリを実行し、クエリの出力を Blobstore に書き込みます
  5. ユーザーは最終結果をダウンロードします

これはすべて、多数のパイプラインを使用して行われます。問題は、ステップ 2 で作成したファイルをステップ 4 の後に削除できるようにすると、BigQuery テーブルが更新されないことです。私は2つの方法を試しました:

  1. 作成されたファイルのみを削除する 2 つのメイン パイプラインの最後に実行するクリーンアップ パイプラインを作成しました。
  2. ステップ 4 の最後にファイルを削除しようとしました (同じパイプライン内で)

以下の私のコードでは、両方のメソッドがコメントアウトされていることがわかります。ファイルを削除しようとしても BigQuery が更新されないのはなぜですか? 削除するのが早すぎますか?そのように聞こえるかもしれませんが、delete 関数を実行するコードは、出力ファイルが Blobstore に書き込まれた後にのみ実行されると確信しています。どんな助けでも大歓迎です!

私の主なパイプラインはBlobstore2BigQueryMaster以下です

class Blobstore2GoogleStorage(base_handler.PipelineBase):
    def run(self, blobkey, bucket_name):
        yield mapreduce_pipeline.MapperPipeline(
            "<NAME>",
            "<MAPPER>",
            "mapreduce.input_readers.BlobstoreLineInputReader",
            "mapreduce.output_writers.FileOutputWriter",
            params={
                "input_reader" : {
                    "blob_keys": blobkey,
                },
                "output_writer": {
                    "filesystem": "gs",
                    "gs_bucket_name": bucket_name,
                    "mime_type": "text/csv",
                }
            },
            shards=24)

class GoogleStorage2BigQuery(base_handler.PipelineBase):
    def run(self, file_names, filekey):
        bq = bigquery.BigQueryApi()
        gspaths = [f.replace('/gs/', 'gs://') for f in file_names]

        result = bq.submit_job(jobData(<TABLE_NAME>, gspaths))
        yield BigQueryImportCheck(result['jobReference']['jobId'], filekey, file_names)


class BigQueryImportCheck(base_handler.PipelineBase):
    def run(self, job, filekey, file_names):
        bq = bigquery.BigQueryApi()
        status = bq.get_job(job_id=job)

        if status['status']['state'] == 'PENDING' or status['status']['state'] == 'RUNNING':
            delay = yield pipeline.common.Delay(seconds=1)
            with pipeline.After(delay):
                yield BigQueryImportCheck(job, filekey, file_names)

        yield BigQueryExport(filekey, file_names)  


class QueryCompletionCheck(base_handler.PipelineBase):
    def run(self, job):  
        bq = bigquery.BigQueryApi()
        status = bq.get_job(job_id=job)
        if status['status']['state'] == 'PENDING' or status['status']['state'] == 'RUNNING':
            delay = yield pipeline.common.Delay(seconds=1)
            with pipeline.After(delay):
                yield QueryCompletionCheck(job)

        yield pipeline.common.Return(status)


class BigQueryExport(base_handler.PipelineBase):
    def run(self, filekey, file_names):        
        bq = bigquery.BigQueryApi()
        #Submit Job to BigQuery Here
        #<REMOVED>

        response = yield QueryCompletionCheck(insertResponse['jobReference']['jobId'])

        #WRITE QUERY RESPONSE TO BLOBSTORE HERE
        #<REMOVED>

        #files.delete(",".join(file_names))
        yield pipeline.common.Return(response)

class Blobstore2BigQueryMaster(base_handler.PipelineBase):
    def run(self, filekey, blobkey, bucket_name):
        file_names = yield Blobstore2GoogleStorage(blobkey, bucket_name)
        synchronize = yield GoogleStorage2BigQuery(file_names, filekey)
        yield CleanupCloudStorage(synchronize, file_names)


class CleanupCloudStorage(base_handler.PipelineBase):
    def run(self, synchronize, file_names):
        #files.delete(",".join(file_names))
        yield pipeline.common.Return('Done')

def jobData(tableId, sourceUris):
    #Configuration for the BigQuery Job here including
    #'createDisposition':'CREATE_IF_NEEDED'
    #'writeDisposition':'WRITE_TRUNCATE'
4

1 に答える 1

1

BigQuery の読み込みジョブが完了したらすぐに、Google Cloud Storage から CSV ファイルを安全に削除できます。

完了したジョブのエラーをチェックして、インポートが失敗した理由が明らかになるかどうかを確認することをお勧めします。可能であれば、インポート完了チェックが時期尚早に返されていないことを確認してください。

于 2013-01-18T02:02:35.803 に答える