私は timeWindow で ContinuousTimeEventTrigger を使用しようとしています:
val timed_stamped_stream = likes_stream.map({
p => {
val x = p.split(",")
(get_timestamp(x(0)), x(1).trim.toLong, x(2).trim.toLong)
}}).assignTimestamps(new TimestampExtractor[(Long, Long, Long)] {
override def getCurrentWatermark: Long = Long.MinValue
override def extractWatermark(t: (Long, Long, Long), l: Long): Long = t._1
override def extractTimestamp(t: (Long, Long, Long), l: Long): Long = t._1
}).keyBy(2).timeWindow(Time.of(5, TimeUnit.SECONDS))
timed_stamped_stream.trigger(ContinuousEventTimeTrigger.of(Time.of(5, TimeUnit.SECONDS))).sum(1).print()
しかし、次のstackTraceを使用してflinkストリーミングクラスターで実行中にjava.lang.StackOverflowErrorを取得しています:
java.lang.StackOverflowError
at java.util.HashMap.putVal(HashMap.java:628)
at java.util.HashMap.put(HashMap.java:611)
at java.util.HashSet.add(HashSet.java:219)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.registerEventTimeTimer(WindowOperator.java:532)
at org.apache.flink.streaming.api.windowing.triggers.ContinuousEventTimeTrigger.onEventTime(ContinuousEventTimeTrigger.java:61)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onEventTime(WindowOperator.java:558)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onEventTime(WindowOperator.java:562)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onEventTime(WindowOperator.java:562)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onEventTime(WindowOperator.java:562)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onEventTime(WindowOperator.java:562)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onEventTime(WindowOperator.java:562)
at org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onEventTime(WindowOperator.java:562)
助けてくれる人はいますか?