9

コンテキスト: Apache Spark を使用して、ログからさまざまな種類のイベントの実行中のカウントを集計しています。ログは、履歴分析用の Cassandra とリアルタイム分析用の Kafka の両方に保存されます。各ログには、日付とイベント タイプがあります。簡単にするために、1 日ごとに 1 つのタイプのログの数を追跡したいとします。

Cassandra からのバッチ データの RDD と、Kafka からの別のストリーミング RDD の 2 つの RDD があります。擬似コード:

CassandraJavaRDD<CassandraRow> cassandraRowsRDD = CassandraJavaUtil.javaFunctions(sc).cassandraTable(KEYSPACE, TABLE).select("date", "type");

JavaPairRDD<String, Integer> batchRDD = cassandraRowsRDD.mapToPair(new PairFunction<CassandraRow, String, Integer>() {
    @Override
    public Tuple2<String, Integer> call(CassandraRow row) {
        return new Tuple2<String, Integer>(row.getString("date"), 1);
    }
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer count1, Integer count2) {
        return count1 + count2;
    }
});

save(batchRDD) // Assume this saves the batch RDD somewhere

...

// Assume we read a chunk of logs from the Kafka stream every x seconds.
JavaPairReceiverInputDStream<String, String> kafkaStream =  KafkaUtils.createStream(...);
JavaPairDStream<String, Integer> streamRDD = kafkaStream.flatMapToPair(new PairFlatMapFunction<Tuple2<String, String>, String, Integer>() {
    @Override
    public Iterator<Tuple2<String, Integer> call(Tuple2<String, String> data) {
        String jsonString = data._2;
        JSON jsonObj = JSON.parse(jsonString);
        Date eventDate = ... // get date from json object
        // Assume startTime is broadcast variable that is set to the time when the job started.
        if (eventDate.after(startTime.value())) { 
            ArrayList<Tuple2<String, Integer>> pairs = new ArrayList<Tuple2<String, Integer>>();
            pairs.add(new Tuple2<String, Integer>(jsonObj.get("date"), 1));
            return pairs;
        } else {
            return new ArrayList<Tuple2<String, Integer>>(0); // Return empty list when we ignore some logs
        }
    }
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
    @Override
    public Integer call(Integer count1, Integer count2) {
        return count1 + count2;
    }
}).updateStateByKey(new Function2<List<Integer>, Optional<List<Integer>>, Optional<Integer>>() {
    @Override
    public Optional<Integer> call(List<Integer> counts, Optional<Integer> state) {
        Integer previousValue = state.or(0l);
        Integer currentValue = ... // Sum of counts
        return Optional.of(previousValue + currentValue);
    }
});
save(streamRDD); // Assume this saves the stream RDD somewhere

sc.start();
sc.awaitTermination();

質問: streamRDD の結果を batchRDD と組み合わせるにはどうすればよいですか? batchRDD次のデータがあり、このジョブが 2014 年 10 月 16 日に実行されたとします 。

("2014-10-15", 1000000)
("2014-10-16", 2000000)

Cassandra クエリにはバッチ クエリの開始時刻までのすべてのデータしか含まれていないため、ジョブの開始時刻以降のログのみを考慮して、クエリが終了したときに Kafka から読み取る必要があります。クエリには長い時間がかかると想定しています。これは、履歴結果とストリーミング結果を組み合わせる必要があることを意味します。

説明のために:

    |------------------------|-------------|--------------|--------->
tBatchStart             tStreamStart   streamBatch1  streamBatch2

次に、最初のストリーム バッチで次のデータを取得したとします。

("2014-10-19", 1000)

次に、バッチ RDD をこのストリーム RDD と組み合わせて、ストリーム RDD が次の値を持つようにします。

("2014-10-19", 2001000)

次に、2 番目のストリーム バッチで次のデータを取得したとします。

("2014-10-19", 4000)

次に、ストリーム RDD を更新して、次の値を取得する必要があります。

("2014-10-19", 2005000)

等々...

を使用streamRDD.transformToPair(...)して streamRDD データを batchRDD データと結合するために使用することは可能joinですが、これをストリーム チャンクごとに行うと、ストリーム チャンクごとに batchRDD からのカウントを追加することになり、状態値が「ダブル カウント」されます。最初のストリーム チャンクにのみ追加する必要があります。

4

2 に答える 2

5

このケースに対処するには、ベース rdd をStateDStream、ストリーミング データの合計を保持する集計の結果と結合します。これにより、ストリーミング間隔ごとにレポートされるデータのベースラインが効果的に提供されます。ベースライン x 回はカウントされません。

サンプルの WordCount を使用してそのアイデアを試してみましたが、うまくいきました。ライブの例として、これを REPL にドロップします。

(nc -lk 9876別のシェルで使用して、に入力を提供しますsocketTextStream)

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.storage.StorageLevel

@transient val defaults = List("magic" -> 2, "face" -> 5, "dust" -> 7 )
val defaultRdd = sc.parallelize(defaults)

@transient val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint("/tmp/spark")

val lines = ssc.socketTextStream("localhost", 9876, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCount = words.map(x => (x, 1)).reduceByKey(_ + _)
val historicCount = wordCount.updateStateByKey[Int]{(newValues: Seq[Int], runningCount: Option[Int]) => 
    Some(newValues.sum + runningCount.getOrElse(0))
}
val runningTotal = historicCount.transform{ rdd => rdd.union(defaultRdd)}.reduceByKey( _+_ )

wordCount.print()
historicCount.print()
runningTotal.print()
ssc.start()
于 2014-10-24T16:53:22.790 に答える
1

あなたはupdateStateByKey試してみることができます:

def main(args: Array[String]) {

    val updateFunc = (values: Seq[Int], state: Option[Int]) => {
        val currentCount = values.foldLeft(0)(_ + _)
        val previousCount = state.getOrElse(0)
        Some(currentCount + previousCount)
    }

    // stream
    val ssc = new StreamingContext("local[2]", "NetworkWordCount", Seconds(1))
    ssc.checkpoint(".")
    val lines = ssc.socketTextStream("127.0.0.1", 9999)
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map(word => (word, 1))
    val stateWordCounts = pairs.updateStateByKey[Int](updateFunc)
    stateWordCounts.print()
    ssc.start()
    ssc.awaitTermination()
}
于 2014-10-23T11:14:52.543 に答える