ファイル拡張子が .gz であるため、圧縮された GCS のファイルから json データを解析する必要があるため、再編成してデータフローによって適切に処理する必要がありますが、ジョブ ログには判読できない文字が出力され、データは処理されません。非圧縮データを処理すると、正常に機能しました。次の方法を使用して、json をマップ/解析しました。
ObjectMapper mapper = new ObjectMapper();
Map<String, String> eventDetails = mapper.readValue(c.element(),
new TypeReference<Map<String, String>>() {
});
何が原因でしょうか?
===================================
入力ファイルから読み取る方法の詳細を追加するには:
パイプラインを作成するには:
Poptions pOptions = PipelineOptionsFactory.fromArgs(args).withValidation().as(Poptions.class); Pipeline p = Pipeline.create(pOptions); p.apply(TextIO.Read.named("ReadLines").from(pOptions.getInput())) .apply(new Pimpression()) .apply(BigQueryIO.Write .to(pOptions.getOutput()) .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_NEVER) .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_APPEND)); p.run();
実行時の構成:
PROJECT="myProjectId" DATASET="myDataSetId" INPUT="gs://foldername/input/*" STAGING1="gs://foldername/staging" TABLE1="myTableName" mvn exec:java -pl example \ -Dexec.mainClass=com.google.cloud.dataflow.examples.Example1 \ -Dexec.args="--project=${PROJECT} --output=${PROJECT}:${DATASET}.${TABLE1} --input=${INPUT} --stagingLocation=${STAGING1} --runner=BlockingDataflowPipelineRunner"
入力ファイル名の例: file.gz、およびコマンド gsutil ls -L gs://bucket/input/file.gz | の出力 grep コンテンツは次のとおりです。
Content-Length: 483100 Content-Type: application/octet-stream