1
JavaRDD<String> history_ = sc.emptyRDD();

java.util.Queue<JavaRDD<String> > queue = new LinkedList<JavaRDD<String>>();
queue.add(history_);
JavaDStream<String> history_dstream = ssc.queueStream(queue);

JavaPairDStream<String,ArrayList<String>> history = history_dstream.mapToPair(r -> {
  return new Tuple2< String,ArrayList<String> >(null,null);
});  

 JavaPairInputDStream<String, GenericData.Record> stream_1 =
    KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
        GenericDataRecordDecoder.class, props, topicsSet_1);


JavaPairInputDStream<String, GenericData.Record> stream_2 =
    KafkaUtils.createDirectStream(ssc, String.class, GenericData.Record.class, StringDecoder.class,
        GenericDataRecordDecoder.class, props, topicsSet_2);

次に、いくつかの変換を行い、タイプの twp DStream Data_1 および Data_2 を作成します

JavaPairDStream<String, <ArrayList<String>>

以下のように結合を行い、結合キーのないレコードを除外し、Data_1 との結合を行うことで、次のバッチで使用するために履歴に保存します。

 Data_1 = Data_1.union(history);

JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> joined =
    Data_1.leftOuterJoin(Data_2).cache();


JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> notNULL_join = joined.filter(r -> r._2._2().isPresent());
JavaPairDStream<String, Tuple2<ArrayList<String>, Optional<ArrayList<String>>>> dstream_filtered = joined.filter(r -> !r._2._2().isPresent());

history = dstream_filtered.mapToPair(r -> {
  return new Tuple2<>(r._1,r._2._1);
}).persist;

前のステップの後に履歴を取得します(hdfsに保存して確認しました)が、ユニオンを実行している間、この履歴はまだバッチで空です。

4

1 に答える 1