サンプルのスケルトン コードは次のようなものです。ここでは、基本的に BigQuery から RDD を読み取り、my_field_name 値が null であるすべてのデータ ポイントを選択しています。
JavaPairRDD<String, GenericData.Record> input = sc
.newAPIHadoopRDD(hadoopConfig, AvroBigQueryInputFormat.class, LongWritable.class, GenericData.Record.class)
.mapToPair( tuple -> {
GenericData.Record record = tuple._2;
Object rawValue = record.get(my_field_name); // Problematic !! want to get my_field_name of this bq row, but just gave something not making sense
String partitionValue = rawValue == null ? "EMPTY" : rawValue.toString();
return new Tuple2<String, GenericData.Record>(partitionValue, record);
}).cache();
JavaPairRDD<String, GenericData.Record> emptyData =
input.filter(tuple -> StringUtils.equals("EMPTY", tuple._1));
emptyData.values().saveAsTextFile(my_file_path)
ただし、出力RDDは完全に予想外のようです。特に my_field_name の値は完全にランダムに見えます。少しデバッグした後、フィルタリングは期待どおりに行われているようですが、問題は抽出した値にありますGenericData.Record
(基本的にrecord.get(my_field_name)
) 完全にランダムに見えます。
したがって、AvroBigQueryInputFormat から GsonBigQueryInputFormat に切り替えて、代わりに json で bq を読み取った後、このコードは正しく機能しているようです。
ただし、理想的には、代わりに Avro を使用したいのですが (json を処理するよりもはるかに高速である必要があります)、コードでの現在の動作は完全に邪魔です。AvroBigQueryInputFormat を間違って使用しているだけですか?