私は 2 つのデータセットに参加しています。1 つ目はストリームからのもので、2 つ目は HDFS のものです。
スパークでスカラを使用しています。2 つのデータセットを結合した後、結合したデータセットにフィルターを適用する必要がありますが、ここで問題に直面しています。解決にご協力ください。
以下のコードを使用していますが、
val streamkv = streamrecs.map(_.split("~")).map(r => ( r(0), (r(5), r(6))))
val HDFSlines = sc.textFile("/user/Rest/sample.dat").map(_.split("~")).map(r => ( r(1), (r(0) r(3),r(4),)))
val streamwindow = streamkv.window(Minutes(1))
val join1 = streamwindow.transform(joinRDD => { joinRDD.join(HDFSlines)} )
フィルターを使用すると、次のエラーが発生します
val tofilter = join1.filter {
| case (r(0), (r(5), r(6)),(r(0),r(3),r(4))) =>
| r(4).contains("iPhone")
| }.count()
<console>:48: error: constructor cannot be instantiated to expected type;
found : (T1, T2, T3)
required: (String, ((String, String), (String, String, String)))
case (r(0), (r(5), r(6)),(r(0),r(3),r(4))) =>