0

私のアプリケーションは、あるマシンで実行されている Kafka サーバーからのメッセージを消費し、別のインスタンスで実行されている別のリモート Kafka に転送します。アプリを Cloud Foundry にデプロイし、最初の Kafka サーバーにメッセージを送信すると、アプリケーションは期待どおりに動作します。メッセージは消費され、リモートの Kafka に転送されます。

ただし、その後、Cloud Foundry で以下の例外の無限ループが発生します (ローカル マシンでもより遅いペースで):

スタックトレース:

Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT 2016-06-03 18:20:34.900 WARN 29 --- [ad | producer-1] o.apache.kafka.common.network.Selector : Error in I/O with localhost/127.0.0.1
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method) ~[na:1.8.0_65-]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT java.net.ConnectException: Connection refused
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717) ~[na:1.8.0_65-]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.common.network.Selector.poll(Selector.java:238) ~[kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192) [kafka-clients-0.8.2.2.jar!/:na]
Fri Jun 03 2016 12:20:34 GMT-0600 (Mountain Daylight Time) [App/0] OUT at java.lang.Thread.run(Thread.java:745) [na:1.8.0_65-]

私のアプリケーションyamlファイルはこのようなものです

アプリケーション YML :

spring:
  cloud:
    stream:
      bindings:
        activationMsgQueue:
          binder: kafka1
          destination: test
          contentType: application/json
          consumer:
            resetOffsets: true
            startOffset: latest
        input:
          binder: kafka2
          content-type: application/x-java-object;type=com.comcast.activation.message.vo.ActivationDataInfo
          destination: test
          group: prac  
      binders:
        kafka1:
          type: kafka
          environment:
            spring:
              kafka:
                host: caapmsg-as-a1p.sys.comcast.net
        kafka2:
          type: kafka
          environment:
            spring:
              kafka:
                host: caapmsg-as-a3p.sys.comcast.net
      default-binder: kafka2                    
      kafka:
        binder:
          zk-nodes: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net

以下の構成を含めるとエラーが消えることを確認しましたが、メッセージが消費されて送信される無限ループが発生しました。

スニペット:

kafka:
        binder:
           brokers: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net
          zk-nodes: caapmsg-as-a1p.sys.comcast.net, caapmsg-as-a3p.sys.comcast.net

この無限ループを止めるにはどうすればいいですか?

こんにちはマリウス、SOS コールに応答していただきありがとうございます。上記の問題を強化しました。フローは、a1p(topic:test) から消費し、メッセージが有効な場合は a3p(topic:test) に転送し、そうでない場合はエラー メッセージを a1p(topic:errorMsgQueue) に送信します。以下のアプリケーションがあります。yml ファイル

春: クラウド: ストリーム: バインディング: errorMsgQueue: バインダー: kafka1 宛先: errorMsgQueue contentType: アプリケーション/json 入力: バインダー: kafka2 コンテンツ タイプ: application/x-java-object;type=com.comcast.activation.message.vo. ActivationDataInfo 宛先: テスト グループ: prac
activationMsgQueue: バインダー: kafka3 宛先: テスト contentType: アプリケーション/json バインダー: kafka1: タイプ: kafka 環境: 春: クラウド: ストリーム: kafka: バインダー: ブローカー: caapmsg-as-a1p.sys.comcast.net zk-nodes: caapmsg -as-a1p.sys.comcast.net kafka2: タイプ: kafka 環境: 春: クラウド: ストリーム: kafka: バインダー: ブローカー: caapmsg-as-a3p.sys.comcast.net zk-nodes: caapmsg-as-a3p. sys.comcast.net kafka3: タイプ: カフカ 環境: 春: クラウド:ストリーム: kafka: バインダー: ブローカー: caapmsg-as-a1p.sys.comcast.net zk ノード: caapmsg-as-a1p.sys.comcast.net デフォルト バインダー: kafka2

私はまだ無限ループを取得しています。私は何を間違っていますか?

4

1 に答える 1

0

spring.kafka.hostSpring Cloud Stream の有効な構成オプションではありません。http://docs.spring.io/spring-cloud-stream/docs/1.0.0.RELEASE/reference/htmlsingle/index.html#_kafka_binder_propertiesは、バインダーがサポートする唯一のプロパティです。

また、アプリケーションが 2 つのクラスターの構成を混在させているようです。(私はそれらが別々のクラスターだと思いますか?)

それはむしろ次のようなものであるべきです:

spring: cloud: stream: bindings: activationMsgQueue: binder: kafka1 destination: test contentType: application/json consumer: resetOffsets: true startOffset: latest input: binder: kafka2 content-type: application/x-java-object;type=com.comcast.activation.message.vo.ActivationDataInfo destination: test group: prac
binders: kafka1: type: kafka environment: spring: cloud: stream: kafka: binder: brokers: caapmsg-as-a1p.sys.comcast.net zk-nodes: caapmsg-as-a1p.sys.comcast.net kafka2: type: kafka environment: spring: cloud: stream: kafka: binder: brokers: caapmsg-as-a3p.sys.comcast.net zk-nodes: caapmsg-as-a3p.sys.comcast.net default-binder: kafka2

詳細については、この例https://github.com/spring-cloud/spring-cloud-stream-samples/blob/master/multibinder-differentsystems/src/main/resources/application.ymlを参照してください。

無限ループは、同じトピックへのメッセージの送受信によって何らかの形で発生していると思われます。

于 2016-06-07T02:58:42.417 に答える