5

Apache Spark Streaming 1.6.1 を使用して、2 つのキー/値データ ストリームを結合し、出力を HDFS に書き込む Java アプリケーションを作成しています。2 つのデータ ストリームには K/V 文字列が含まれており、textFileStream() を使用して HDFS から Spark に定期的に取り込まれます。

2 つのデータ ストリームは同期されていません。つまり、時刻 t0 でストリーム 1 にあった一部のキーが、時刻 t1 でストリーム 2 に表示されるか、またはその逆になる可能性があります。したがって、私の目標は、2 つのストリームを結合し、「残りの」キーを計算することです。これは、次のバッチ間隔での結合操作で考慮する必要があります。

これをより明確にするために、次のアルゴリズムを見てください。

variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0

operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2)

このアルゴリズムを Spark Streaming で実装しようとしましたが、うまくいきませんでした。最初に、この方法で残りのキーに対して 2 つの空のストリームを作成します (これは 1 つのストリームにすぎませんが、2 つ目のストリームを生成するコードは似ています)。

JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>();
q.add(empty_rdd);
JavaDStream<String> empty_dstream = jssc.queueStream(q);
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String> () {
                                 @Override
                                 public scala.Tuple2<String, String> call(String s) {
                                   return new scala.Tuple2(s, s);
                                 }
                               });

後で、この空のストリームはストリーム 1 と統合 (つまり、union()) され、最後に結合後、ストリーム 1 の残りのキーを追加し、window() を呼び出します。stream2 でも同じことが起こります。

問題は、left_keys_s1 と left_keys_s2 を生成する操作がアクションのない変換であることです。これは、Spark が RDD フロー グラフを作成しないため、決して実行されないことを意味します。私が今得ているのは、キーが同じ時間間隔で stream1 と stream2 にあるレコードのみを出力する結合です。

これをSparkで正しく実装するための提案はありますか?

ありがとう、マルコ

4

1 に答える 1

1

それらの値が保持されているRDDへの参照を保持することにより、あるバッチから次のバッチに値を持ち越すことができるはずです。

を使用してストリームをマージしようとしないでくださいqueueDStream。代わりに、各ストリーミング間隔で更新できる変更可能な RDD 参照を宣言してください。

これは例です:

100このストリーミング ジョブでは、整数を運ぶ RDD から始めます。各間隔で10乱数が生成され、最初の 100 個の整数が減算されます。このプロセスは、100 個の要素を持つ最初の RDD が空になるまで続きます。この例は、ある区間から次の区間に要素を引き継ぐ方法を示しています。

  import scala.util.Random
  import org.apache.spark.streaming.dstream._

  val ssc = new StreamingContext(sparkContext, Seconds(2))

  var targetInts:RDD[Int] = sc.parallelize(0 until 100)

  var loops = 0

  // we create an rdd of functions that generate random data. 
  // evaluating this RDD at each interval will generate new random data points.
  val randomDataRdd = sc.parallelize(1 to 10).map(_ => () => Random.nextInt(100))

  val dstream = new ConstantInputDStream(ssc, randomDataRdd)

  // create values from the random func rdd

  dataDStream.foreachRDD{rdd => 
                        loops += 1
                        targetInts = targetInts.subtract(rdd)
                        if (targetInts.isEmpty) {println(loops); ssc.stop(false)}
                       }


  ssc.start()

この例を実行してプロットloopsするとtargetInts.count、次のグラフが得られます。

乱数を生成して 100 int を削除する

これにより、完全なユースケースを実装するための十分なガイダンスが得られることを願っています。

于 2016-05-22T23:06:57.783 に答える