0

ファイル拡張子が .gz であるため、圧縮された GCS のファイルから json データを解析する必要があるため、再編成してデータフローによって適切に処理する必要がありますが、ジョブ ログには判読できない文字が出力され、データは処理されません。非圧縮データを処理すると、正常に機能しました。次の方法を使用して、json をマップ/解析しました。

        ObjectMapper mapper = new ObjectMapper();
        Map<String, String> eventDetails = mapper.readValue(c.element(),
                    new TypeReference<Map<String, String>>() {
                    });

何が原因でしょうか?

===================================

入力ファイルから読み取る方法の詳細を追加するには:

  1. パイプラインを作成するには:

    Poptions pOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(Poptions.class);
    Pipeline p = Pipeline.create(pOptions);
    p.apply(TextIO.Read.named("ReadLines").from(pOptions.getInput()))                                          
     .apply(new Pimpression())
     .apply(BigQueryIO.Write
    .to(pOptions.getOutput())
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER)
    .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND));
    p.run();
    
  2. 実行時の構成:

    PROJECT="myProjectId"
    DATASET="myDataSetId"
    INPUT="gs://foldername/input/*"
    STAGING1="gs://foldername/staging" 
    TABLE1="myTableName"
    mvn exec:java -pl example \
    -Dexec.mainClass=com.google.cloud.dataflow.examples.Example1 \
    -Dexec.args="--project=${PROJECT} --output=${PROJECT}:${DATASET}.${TABLE1}   --input=${INPUT} --stagingLocation=${STAGING1} --runner=BlockingDataflowPipelineRunner"
    
  3. 入力ファイル名の例: file.gz、およびコマンド gsutil ls -L gs://bucket/input/file.gz | の出力 grep コンテンツは次のとおりです。

    Content-Length:     483100
    Content-Type:       application/octet-stream
    
4

1 に答える 1