3

kafka-streams 0.10.0.0 を使用すると、メッセージの転送時に StreamTask で null ポインター例外が定期的に発生します。呼び出しの 10% から 50% の間で変化します。NPE は次の方法で発生します。

public <K, V> void forward(K key, V value) {
    ProcessorNode thisNode = currNode;
    try {
        for (ProcessorNode childNode : (List<ProcessorNode<K, V>>) thisNode.children()) {
            currNode = childNode;
            childNode.process(key, value);
        }
    } finally {
        currNode = thisNode;
    }
}

場合によっては、thisNodeフィールドが null になっているようです。これを引き起こしている可能性のある考えはありますか?スタックトレースは以下です。

[ERROR] 2016-08-21 14:50:39.288 [StreamThread-1] StreamedMetricMeter - Forwarding failed
java.lang.NullPointerException
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:336) ~[kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) ~[kafka-streams-0.10.0.0.jar:?]
    at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.forward(AbstractStreamedMetricProcessor.java:552) [classes/:?]
    at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:89) [classes/:?]
    at com.heliosapm.streams.metrics.processors.impl.StreamedMetricMeter.doProcess(StreamedMetricMeter.java:1) [classes/:?]
    at com.heliosapm.streams.metrics.processors.AbstractStreamedMetricProcessor.process(AbstractStreamedMetricProcessor.java:166) [classes/:?]
    at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:68) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.forward(StreamTask.java:338) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:187) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:64) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:174) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:320) [kafka-streams-0.10.0.0.jar:?]
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218) [kafka-streams-0.10.0.0.jar:?]
4

1 に答える 1

8

問題は、私のProcessorSupplier s がgetへの呼び出しごとに Processor の同じインスタンスを返すことでした。次に、Kafka Streams エンジンは複数のプロセッサ インスタンスを作成しようとしましたが、これがマルチスレッドのゴミ箱の火災を引き起こしたことは間違いありません。同様に不注意な点に注意してください.... ProcessorSupplier.get()は、呼び出しごとにプロセッサの新しいインスタンスを返す必要があります。

于 2016-08-24T00:13:10.457 に答える