4

Failed to lock the state directory: /tmp/kafka-streams/string-monitor/0_1kafka ストリーム アプリケーションで状態ストアを作成しているときに、このエラーが発生します。アプリケーションの完全なスタック トレースは次のとおりです。

[2016-08-30 12:43:09,408] ERROR [StreamThread-1] User provided listener org.apache.kafka.streams.processor.internals.StreamThread$1 for group string-monitor failed on partition assignment (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: java.io.IOException: Failed to lock the state directory: /tmp/kafka-streams/string-monitor/0_1
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
    ... 32 more
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:71)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:86)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:550)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:577)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$000(StreamThread.java:68)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:123)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:222)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:232)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$1.onSuccess(AbstractCoordinator.java:227)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$2.onSuccess(RequestFuture.java:182)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:436)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$SyncGroupResponseHandler.handle(AbstractCoordinator.java:422)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
    at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
    at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:192)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:163)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:243)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.ensurePartitionAssignment(ConsumerCoordinator.java:345)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:977)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:937)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:295)
    ... 1 more
Caused by: java.io.IOException: Failed to lock the state directory: /tmp/kafka-streams/string-monitor/0_1
    at org.apache.kafka.streams.processor.internals.ProcessorStateManager.<init>(ProcessorStateManager.java:95)
    at org.apache.kafka.streams.processor.internals.AbstractTask.<init>(AbstractTask.java:69)
    ... 32 more

そして、以下のようにステートストアを作成します

 StateStoreSupplier avgStore = Stores.create("avgStore")
          .withKeys(Serdes.String())
          .withValues(Serdes.String())
          .persistent()
          .build();

これを修正する方法はありますか?

4

3 に答える 3

4

アプリケーション インスタンス内で複数のスレッドを構成しましたか? はいの場合、それは Kafka の古いバージョンの既知の問題が原因である可能性があります。この問題では、Kafka Streams (アプリ インスタンス内) によって使用される基になるコンシューマーが再調整に時間がかかりすぎて、それ自体がコンシューマー グループから追い出される可能性があります (およびしたがって、別のコンシューマー グループのリバランスがトリガーされます) まだ最初のリバランス プロセス中です。

スタックトレースの次のエラー メッセージは、上記の問題が実際に発生していることを示しています。

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Failed to rebalance
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:299)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:218)
Caused by: org.apache.kafka.streams.errors.ProcessorStateException: Error while creating the state manager

この問題は、次の Apache Kafka チケットにまとめられています。

https://issues.apache.org/jira/browse/KAFKA-3758

基礎となる Kafka コンシューマ クライアントに対する最近の変更により、この問題が修正されました。

https://cwiki.apache.org/confluence/display/KAFKA/KIP-62%3A+Allow+consumer+to+send+heartbeats+from+a+background+threadを許可

ただし、公式の Kafka リリースにはまだ含まれておらず、現在のところ、Apache Kafka でのみ使用できますtrunk。アプリを Kafka でtrunk実行できる場合は、この問題がすでに解消されているかどうかを確認できます。

于 2016-08-31T21:27:36.433 に答える
0

同じFailed to lock the state directory例外を受け取って使用する人のためのメモですThe Spring for Apache Kafka (spring-kafka)春のドキュメントの例):

Spring がビルドされた Steam を自動的に開始することをソース コードで理解するのにしばらく時間がかかりました。そのため、次のようなことを手動で行う必要はありません。

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();

私はそれを無視し、前述の例外に直面しました。これは、2 つのアプリケーション インスタンスが実行されていたためです。1 つは Spring によって作成され、もう 1 つは私によって作成されました。

于 2018-08-15T14:56:10.227 に答える