socketTextStream から単語をカウントする次のコードがあります。累積ワード カウントと時間ウィンドウ ワード カウントの両方が必要です。プログラムには、cumulateCounts が常にウィンドウ カウントと同じであるという問題があります。この問題が発生する理由 ウィンドウ化されたカウントに基づいて累積カウントを計算する正しい方法は何ですか?
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final HashMap<String, Integer> cumulateCounts = new HashMap<String, Integer>();
final DataStream<Tuple2<String, Integer>> counts = env
.socketTextStream("localhost", 9999)
.flatMap(new Splitter())
.window(Time.of(5, TimeUnit.SECONDS))
.groupBy(0).sum(1)
.flatten();
counts.print();
counts.addSink(new SinkFunction<Tuple2<String, Integer>>() {
@Override
public void invoke(Tuple2<String, Integer> value) throws Exception {
String word = value.f0;
Integer delta_count = value.f1;
Integer count = cumulateCounts.get(word);
if (count == null)
count = 0;
count = count + delta_count;
cumulateCounts.put(word, count);
System.out.println("(" + word + "," + count.toString() + ")");
}
});