コンテキスト: Apache Spark を使用して、ログからさまざまな種類のイベントの実行中のカウントを集計しています。ログは、履歴分析用の Cassandra とリアルタイム分析用の Kafka の両方に保存されます。各ログには、日付とイベント タイプがあります。簡単にするために、1 日ごとに 1 つのタイプのログの数を追跡したいとします。
Cassandra からのバッチ データの RDD と、Kafka からの別のストリーミング RDD の 2 つの RDD があります。擬似コード:
CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");
JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
@Override
public Tuple2<String, Integer> call(CassandraRow row) {
return new Tuple2<String, Integer>(row.getString("date"), 1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
});
save(batchRDD) // Assume this saves the batch RDD somewhere
...
// Assume we read a chunk of logs from the Kafka stream every x seconds.
JavaPairReceiverInputDStream<String, String> kafkaStream = KafkaUtils.createStream(...);
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() {
@Override
public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
String jsonString = data._2;
JSON jsonObj = JSON.parse(jsonString);
Date eventDate = ... // get date from json object
// Assume startTime is broadcast variable that is set to the time when the job started.
if (eventDate.after(startTime.value())) {
ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
return pairs;
} else {
return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
}
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer count1, Integer count2) {
return count1 + count2;
}
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
@Override
public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
Integer previousValue = state.or(0l);
Integer currentValue = ... // Sum of counts
return Optional.of(previousValue + currentValue);
}
});
save(streamRDD); // Assume this saves the stream RDD somewhere
sc.start();
sc.awaitTermination();
質問:
streamRDD の結果を batchRDD と組み合わせるにはどうすればよいですか? batchRDD
次のデータがあり、このジョブが 2014 年 10 月 16 日に実行されたとします
。
("2014-10-15", 1000000)
("2014-10-16", 2000000)
Cassandra クエリにはバッチ クエリの開始時刻までのすべてのデータしか含まれていないため、ジョブの開始時刻以降のログのみを考慮して、クエリが終了したときに Kafka から読み取る必要があります。クエリには長い時間がかかると想定しています。これは、履歴結果とストリーミング結果を組み合わせる必要があることを意味します。
説明のために:
|------------------------|-------------|--------------|--------->
tBatchStart tStreamStart streamBatch1 streamBatch2
次に、最初のストリーム バッチで次のデータを取得したとします。
("2014-10-19", 1000)
次に、バッチ RDD をこのストリーム RDD と組み合わせて、ストリーム RDD が次の値を持つようにします。
("2014-10-19", 2001000)
次に、2 番目のストリーム バッチで次のデータを取得したとします。
("2014-10-19", 4000)
次に、ストリーム RDD を更新して、次の値を取得する必要があります。
("2014-10-19", 2005000)
等々...
を使用streamRDD.transformToPair(...)
して streamRDD データを batchRDD データと結合するために使用することは可能join
ですが、これをストリーム チャンクごとに行うと、ストリーム チャンクごとに batchRDD からのカウントを追加することになり、状態値が「ダブル カウント」されます。最初のストリーム チャンクにのみ追加する必要があります。