0

私は 2 つのデータセットに参加しています。1 つ目はストリームからのもので、2 つ目は HDFS のものです。

スパークでスカラを使用しています。2 つのデータセットを結合した後、結合したデータセットにフィルターを適用する必要がありますが、ここで問題に直面しています。解決にご協力ください。

以下のコードを使用していますが、

 val streamkv = streamrecs.map(_.split("~")).map(r => ( r(0), (r(5), r(6)))) 
val HDFSlines = sc.textFile("/user/Rest/sample.dat").map(_.split("~")).map(r => ( r(1), (r(0) r(3),r(4),))) 
val streamwindow = streamkv.window(Minutes(1)) 

val join1 = streamwindow.transform(joinRDD => { joinRDD.join(HDFSlines)} ) 

フィルターを使用すると、次のエラーが発生します

val tofilter = join1.filter {
     | case (r(0), (r(5), r(6)),(r(0),r(3),r(4))) =>
     | r(4).contains("iPhone")
     | }.count()
<console>:48: error: constructor cannot be instantiated to expected type;
found   : (T1, T2, T3)
required: (String, ((String, String), (String, String, String)))
       case (r(0), (r(5), r(6)),(r(0),r(3),r(4))) =>
4

2 に答える 2

0

エラーの理由は、丸かっこのペアが欠落していることです。

の行case (r(0), (r(5), r(6)),(r(0),r(3),r(4)))はむしろcase (_, ((_, _),(_,_,device))) =>- 最後の 2 つの要素を囲む括弧に注意してください -Tuple2およびTuple3オブジェクトです。

于 2015-12-07T20:51:28.403 に答える
0

scala (型付き言語) では、パターン マッチング ( case ) により、ローカルで使用するためにブロック内の変数を「抽出」できます。したがって

case (r(0), (r(5), r(6)),(r(0),r(3),r(4))) =>

ここでr関数を呼び出しているのではなく、値を抽出しているためです(関数宣言を考えてください)。

結果のストリーム/コレクション join1 のオブジェクトがこの型シグネチャ(_, (_, _), (_, _, String))に従うと仮定すると、次のことを試してください。

val tofilter = join1.filter {
 | case (_, (_, _),(_,_,device)) =>
 | device.contains("iPhone")
 | }.count()
于 2014-09-11T16:54:59.940 に答える