私は、温度値を受け取り、すべての時間の平均温度を計算する Spark Streaming アプリケーションを作成しました。そのために、JavaPairDStream.updateStateByKey
トランザクションを使用してデバイスごとに計算しました(ペアのキーで区切られています)。状態の追跡には、すべての温度値を double として保持し、メソッドStatCounter
を呼び出して各ストリームの平均を再計算するクラスを使用します。StatCounter.mean
ここに私のプログラム:
コード全体を編集: StatCounter を使用するようになりました
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
streamingContext.checkpoint("hdfs://server:8020/spark-history/checkpointing");
JavaReceiverInputDStream<String> ingoingStream = streamingContext.socketTextStream(serverIp, 11833);
JavaDStream<SensorData> sensorDStream = ingoingStream.map(new Function<String, SensorData>() {
public SensorData call(String json) throws Exception {
ObjectMapper om = new ObjectMapper();
return (SensorData)om.readValue(json, SensorData.class);
}
});
JavaPairDStream<String, Float> temperatureDStream = sensorDStream.mapToPair(new PairFunction<SensorData, String, Float>() {
public Tuple2<String, Float> call(SensorData sensorData) throws Exception {
return new Tuple2<String, Float>(sensorData.getIdSensor(), sensorData.getValTemp());
}
});
JavaPairDStream<String, StatCounter> statCounterDStream = temperatureDStream.updateStateByKey(new Function2<List<Float>, Optional<StatCounter>, Optional<StatCounter>>() {
public Optional<StatCounter> call(List<Float> newTemperatures, Optional<StatCounter> statsYet) throws Exception {
StatCounter stats = statsYet.or(new StatCounter());
for(float temp : newTemperatures) {
stats.merge(temp);
}
return Optional.of(stats);
}
});
JavaPairDStream<String, Double> avgTemperatureDStream = statCounterDStream.mapToPair(new PairFunction<Tuple2<String,StatCounter>, String, Double>() {
public Tuple2<String, Double> call(Tuple2<String, StatCounter> statCounterTuple) throws Exception {
String key = statCounterTuple._1();
double avgValue = statCounterTuple._2().mean();
return new Tuple2<String, Double>(key, avgValue);
}
});
avgTemperatureDStream.print();
これはうまくいくようです。しかし今、質問に:
ここですべての時間の平均を計算する方法も示すオンラインの例を見つけました: https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/total.html
AtmoicLongs
「ステートフルな値」を格納するために etc を使用し、forEachRDD
メソッドでそれらを更新します。
私の質問は次のとおりです。Spark ストリーミングですべての時間をステートフルに計算するためのより良いソリューションは何ですか? いずれかの方法を使用することの利点/欠点はありますか? ありがとうございました!