39

サードパーティの Kafka および ZooKeeper サーバー用の Java クライアントを作成しようとしています。トピックを一覧表示して説明することはできますが、読み込もうとすると aClosedChannelExceptionが発生します。ここでは、コマンド ライン クライアントを使用してそれらを再現します。

$ bin/kafka-console-consumer.sh --zookeeper 255.255.255.255:2181 --topic eventbustopic
[2015-06-02 16:23:04,375] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:1,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException                                       
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)           
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)        
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)                
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)        
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)        
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)         
[2015-06-02 16:23:04,515] WARN Fetching topic metadata with correlation id 0 for topics [Set(eventbustopic)] from broker [id:0,host:SOME_HOST,port:9092] failed (kafka.client.ClientUtils$)
java.nio.channels.ClosedChannelException                                       
    at kafka.network.BlockingChannel.send(BlockingChannel.scala:100)           
    at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:73)        
    at kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:72)
    at kafka.producer.SyncProducer.send(SyncProducer.scala:113)                
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:58)        
    at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:93)        
    at kafka.consumer.ConsumerFetcherManager$LeaderFinderThread.doWork(ConsumerFetcherManager.scala:66)
    at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:60)         

代替コマンドは成功します:

$ bin/kafka-topics.sh --describe --zookeeper 255.255.255.255:2181 --topic eventbustopic
Topic:eventbustopic   PartitionCount:2        ReplicationFactor:1     Configs:
    Topic: eventbustopic  Partition: 0    Leader: 1       Replicas: 1     Isr: 1
    Topic: eventbustopic  Partition: 1    Leader: 0       Replicas: 0     Isr: 0

$ bin/kafka-topics.sh --list --zookeeper 255.255.255.255:2181 --topic eventbustopic
eventbustopic

(ips は編集され、255.255.255.255 に置き換えられました)

この例外をグーグルで検索すると、プロデューサー側の問題が表示されます。実際、ソースは、ClientUtils.fetchTopicMetadataこれが主にプロデューサーによって使用されていることを示唆しています。

私が懸念していることの 1 つは、これがネットワーク レイアウトの産物である可能性があることです。パケットは Haproxy によって破壊され、VPN 経由で送信されます。

ここで正確に何が働いているのですか?

4

4 に答える 4

45

ブローカーは、メッセージの生成/消費にどのホスト名を使用する必要があるかをクライアントに伝えます。デフォルトでは、Kafka はそれが実行されているシステムのホスト名を使用します。このホスト名がクライアント側で解決できない場合、この例外が発生します。

advertised.host.nameKafka 構成で、クライアントが使用する必要があるホスト名/アドレスに設定してみることができます。

于 2015-06-10T11:10:12.303 に答える
11

この問題を解決する私の方法は次のとおりです。

  1. runを実行bin/kafka-server-stop.shして、kafka サーバーの実行を停止します。
  2. config/server.properties次の行を追加して 、プロパティ ファイルを変更します。listeners=PLAINTEXT://{ip.of.your.kafka.server}:9092
  3. カフカサーバーを再起動します。

リスナー設定がないため、kafkajava.net.InetAddress.getCanonicalHostName()はソケット サーバーがリッスンするアドレスを取得するために使用します。

于 2016-07-27T03:45:12.673 に答える
0

Zookeeper に問題があります。255.255.255.255:2181有効な Zookeeper アドレスではありません。これは、ネットワーク上のブロードキャスト アドレスまたはサブネット マスクです。これを機能させるには、Zookeeper を実行しているマシンの IP アドレスまたはホスト名を見つけます。

于 2015-07-25T01:18:09.850 に答える