0

DStream から最初の n 個の RDD を削除したいと思います。変換とともに次の関数を使用しようとしましたが、機能しません (エラー OneForOneStrategy: org.apache.spark.SparkContext java.io.NotSerializableException)。RDD を削除するという私の本当の目標を達成できるとは思いません。空のものが返されるためです。

var num = 0
def dropNrdds(myRDD: RDD[(String, Int)], dropNum: Int) : RDD[(String, Int)] = {
    if (num < dropNum) {
        num = num + 1
        return myRDD
    }
    else {
        return sc.makeRDD(Seq())
    }
}
4

2 に答える 2

0

エラーが発生しています。なぜなら、あなたがこの関数を

foreachRdd

実際に実行者ノードで実行されるループと、実行者ノードで何かを実行したい場合、そのコードの一部はシリアライズ可能である必要があり、SparkContext(sc、dropNrdds メソッド内でそれを参照しています) はシリアライズ可能ではないため、それを取得しています。エラー。

そしてあなたの実際の質問に来ます。

あなたの要件についてはわかりませんが、

RDD の DataFrame を作成し、条件に一致するレコードを選択できます。残りは無視します。

また

フィルターを使用して、フィルター データで新しい RDD を作成できます。

于 2016-08-08T21:09:21.533 に答える