Apache Spark Streaming 1.6.1 を使用して、2 つのキー/値データ ストリームを結合し、出力を HDFS に書き込む Java アプリケーションを作成しています。2 つのデータ ストリームには K/V 文字列が含まれており、textFileStream() を使用して HDFS から Spark に定期的に取り込まれます。
2 つのデータ ストリームは同期されていません。つまり、時刻 t0 でストリーム 1 にあった一部のキーが、時刻 t1 でストリーム 2 に表示されるか、またはその逆になる可能性があります。したがって、私の目標は、2 つのストリームを結合し、「残りの」キーを計算することです。これは、次のバッチ間隔での結合操作で考慮する必要があります。
これをより明確にするために、次のアルゴリズムを見てください。
variables:
stream1 = <String, String> input stream at time t1
stream2 = <String, String> input stream at time t1
left_keys_s1 = <String, String> records of stream1 that didn't appear in the join at time t0
left_keys_s2 = <String, String> records of stream2 that didn't appear in the join at time t0
operations at time t1:
out_stream = (stream1 + left_keys_s1) join (stream2 + left_keys_s2)
write out_stream to HDFS
left_keys_s1 = left_keys_s1 + records of stream1 not in out_stream (should be used at time t2)
left_keys_s2 = left_keys_s2 + records of stream2 not in out_stream (should be used at time t2)
このアルゴリズムを Spark Streaming で実装しようとしましたが、うまくいきませんでした。最初に、この方法で残りのキーに対して 2 つの空のストリームを作成します (これは 1 つのストリームにすぎませんが、2 つ目のストリームを生成するコードは似ています)。
JavaRDD<String> empty_rdd = sc.emptyRDD(); //sc = Java Spark Context
Queue<JavaRDD<String>> q = new LinkedList<JavaRDD<String>>();
q.add(empty_rdd);
JavaDStream<String> empty_dstream = jssc.queueStream(q);
JavaPairDStream<String, String> k1 = empty_dstream.mapToPair(new PairFunction<String, String, String> () {
@Override
public scala.Tuple2<String, String> call(String s) {
return new scala.Tuple2(s, s);
}
});
後で、この空のストリームはストリーム 1 と統合 (つまり、union()) され、最後に結合後、ストリーム 1 の残りのキーを追加し、window() を呼び出します。stream2 でも同じことが起こります。
問題は、left_keys_s1 と left_keys_s2 を生成する操作がアクションのない変換であることです。これは、Spark が RDD フロー グラフを作成しないため、決して実行されないことを意味します。私が今得ているのは、キーが同じ時間間隔で stream1 と stream2 にあるレコードのみを出力する結合です。
これをSparkで正しく実装するための提案はありますか?
ありがとう、マルコ