私のアプリケーションは、あるマシンで実行されている Kafka サーバーからのメッセージを消費し、別のインスタンスで実行されている別のリモート Kafka に転送します。アプリを Cloud Foundry にデプロイし、最初の Kafka サーバーにメッセージを送信すると、アプリケーションは期待どおりに動作します。メッセージは消費され、リモートの Kafka に転送されます。
ただし、その後、Cloud Foundry で以下の例外の無限ループが発生します (ローカル マシンでもより遅いペースで):
スタックトレース:
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT 2016-06-03 18:20:34.900 WARN 29 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Error in I/O with localhost/127.0.0.1
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_65-]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT java.net.ConnectException: Connection refused
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_65-]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65-]
私のアプリケーションyamlファイルはこのようなものです
アプリケーション YML :
spring:
cloud:
stream:
bindings:
activationMsgQueue:
binder: kafka1
destination: test
contentType: application/json
consumer:
resetOffsets: true
startOffset: latest
input:
binder: kafka2
content-type: application/x-java-object;type=com.comcast.activation.message.vo.ActivationDataInfo
destination: test
group: prac
binders:
kafka1:
type: kafka
environment:
spring:
kafka:
host: caapmsg-as-a1p.sys.comcast.net
kafka2:
type: kafka
environment:
spring:
kafka:
host: caapmsg-as-a3p.sys.comcast.net
default-binder: kafka2
kafka:
binder:
zk-nodes: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
以下の構成を含めるとエラーが消えることを確認しましたが、メッセージが消費されて送信される無限ループが発生しました。
スニペット:
kafka:
binder:
brokers: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
zk-nodes: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
この無限ループを止めるにはどうすればいいですか?
こんにちはマリウス、SOS コールに応答していただきありがとうございます。上記の問題を強化しました。フローは、a1p(topic:test) から消費し、メッセージが有効な場合は a3p(topic:test) に転送し、そうでない場合はエラー メッセージを a1p(topic:errorMsgQueue) に送信します。以下のアプリケーションがあります。yml ファイル
春: クラウド: ストリーム: バインディング: errorMsgQueue: バインダー: kafka1 宛先: errorMsgQueue contentType: アプリケーション/json 入力: バインダー: kafka2 コンテンツ タイプ: application/x-java-object;type=com.comcast.activation.message.vo. ActivationDataInfo 宛先: テスト グループ: prac
activationMsgQueue: バインダー: kafka3 宛先: テスト contentType: アプリケーション/json バインダー: kafka1: タイプ: kafka 環境: 春: クラウド: ストリーム: kafka: バインダー: ブローカー: caapmsg-as-a1p.sys.comcast.net zk-nodes: caapmsg -as-a1p.sys.comcast.net kafka2: タイプ: kafka 環境: 春: クラウド: ストリーム: kafka: バインダー: ブローカー: caapmsg-as-a3p.sys.comcast.net zk-nodes: caapmsg-as-a3p. sys.comcast.net kafka3: タイプ: カフカ 環境: 春: クラウド:ストリーム: kafka: バインダー: ブローカー: caapmsg-as-a1p.sys.comcast.net zk ノード: caapmsg-as-a1p.sys.comcast.net デフォルト バインダー: kafka2
私はまだ無限ループを取得しています。私は何を間違っていますか?