0

私は Apache Spark Streaming に関するいくつかの自己完結型の統合テストを作成しています。シミュレートされたテスト データでコードがあらゆる種類のエッジ ケースを取り込めることをテストしたいと考えています。通常のRDD(ストリーミングではない)でこれを行っていたとき。インライン データを使用して「並列化」を呼び出し、spark RDD に変換することができます。ただし、デストリームを作成するためのそのような方法は見つかりません。理想的には、たまに「プッシュ」関数を呼び出して、タプルを魔法のように dstream に表示したいと考えています。ATM Apache Kafka を使用してこれを行っています。一時キューを作成し、それに書き込みます。しかし、これはやり過ぎのようです。Kafka をメディエーターとして使用せずに、テスト データから直接 test-dstream を作成したいと思います。

4

3 に答える 3

5

テスト目的で、RDD のキューから入力ストリームを作成できます。より多くの RDD をキューにプッシュすると、バッチ間隔でより多くのイベントを処理したことがシミュレートされます。

val sc = SparkContextHolder.sc
val ssc = new StreamingContext(sc, Seconds(1))
val inputData: mutable.Queue[RDD[Int]] = mutable.Queue()
val inputStream: InputDStream[Int] = ssc.queueStream(inputData)

inputData += sc.makeRDD(List(1, 2)) // Emulate the RDD created during the first batch interval
inputData += sc.makeRDD(List(3, 4)) // 2nd batch interval
// etc

val result = inputStream.map(x => x*x)
result.foreachRDD(rdd => assertSomething(rdd))
ssc.start() // Don't forget to start the streaming context
于 2016-06-07T19:04:16.823 に答える
0

この基本例を見つけました: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/CustomReceiver.scala

ここで重要なのは、「store」コマンドを呼び出すことです。store の内容を必要なものに置き換えます。

于 2015-10-22T13:10:41.370 に答える