Spark 2.1.1 と Scala 2.11.8 を使用しています
この質問は、私の以前の質問の拡張です。
変更点は、CSV ファイルからデータを読み取るのではなく、avro ファイルからデータを読み取るようになったことです。これは、データを読み取っている avro ファイルの形式です。
var ttime: Long = 0;
var eTime: Long = 0;
var tids: String = "";
var tlevel: Integer = 0;
var tboot: Long = 0;
var rNo: Integer = 0;
var varType: String = "";
var uids: List[TRUEntry] = Nil;
avro ファイルを別のクラスで解析しています。
上記のリンクの受け入れられた回答で述べたのと同じ方法で、tids 列をすべての uid にマップする必要があります。ただし、今回は適切にフォーマットされた csv ファイルではなく avro ファイルからのものです。これどうやってするの?
これは私がそれをやろうとしているコードです:
val avroRow = spark.read.avro(inputString).rdd
val avroParsed = avroRow
.map(x => new TRParser(x))
.map((obj: TRParser) => ((obj.tids, obj.uId ),1))
.reduceByKey(_+_)
.saveAsTextFile(outputString)
obj.tids の後、すべての uids 列を個別にマップして、上記のリンクの受け入れられた回答に記載されているのと同じ最終出力を得る必要があります。
これは、avro ファイル解析クラスですべての uid を解析する方法です。
this.uids = Nil
row.getAs[Seq[Row]]("uids")
.foreach((objRow: Row) =>
this.uids ::= (new TRUEntry(objRow))
)
this.uids
.foreach((obj:TRUEntry) => {
uInfo += obj.uId + " , " + obj.initM.toString() + " , "
})
PS : 質問がばかげているように思われる場合は申し訳ありませんが、これはavroファイルとの最初の出会いです