0

logstash から IBM Message Hub を使用しようとしています。私のlogstash.conf:

input {
    kafka {
        bootstrap_servers =>  "kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093"
        security_protocol => "SASL_SSL"
        sasl_mechanism => "PLAIN"
        jaas_path => "kafka_jaas.conf"
        ssl => true
        topics => [
                "transactions_load"
            ]
    }
}
output {
    stdout { }
}

Message Hub Consumer exampleに基づくと、次の kafka 構成プロパティがありません。

ssl.protocol=TLSv1.2
ssl.enabled.protocols=TLSv1.2
ssl.endpoint.identification.algorithm=HTTPS

ただし、logstash のドキュメントから、これらを設定する方法を確認できませんでした。

私の kafka_jaas.conf ファイルは次のようになります。

KafkaClient {
  org.apache.kafka.common.security.plain.PlainLoginModule required
  serviceName="kafka"
  username="****"
  password="****";
  };

結果は

[2017-10-08T10:00:48,325][ERROR][logstash.inputs.kafka    ] Unable to create Kafka consumer from given configuration {:kafka_error_message=>java.lang.NullPointerException, :cause=>nil}
....
[2017-10-08T10:00:52,717][ERROR][logstash.pipeline        ] A plugin had an unrecoverable error. Will restart this plugin.
  Plugin: <LogStash::Inputs::Kafka bootstrap_servers=>"kafka04-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka05-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka01-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka03-prod01.messagehub.services.eu-de.bluemix.net:9093,kafka02-prod01.messagehub.services.eu-de.bluemix.net:9093", 
security_protocol=>"SASL_SSL", 
sasl_mechanism=>"PLAIN", 
jaas_path=>"kafka_jaas.conf", 
ssl=>true, 
topics=>["transactions_load"], 
id=>"xxxxx",  
enable_metric=>true, 
codec=><LogStash::Codecs::Plain id=>"xxxxx", 
enable_metric=>true, 
charset=>"UTF-8">, 
auto_commit_interval_ms=>"5000", 
client_id=>"logstash", 
consumer_threads=>1, 
enable_auto_commit=>"true", 
group_id=>"logstash",
key_deserializer_class=>
   "org.apache.kafka.common.serialization.StringDeserializer", 
value_deserializer_class=>
   "org.apache.kafka.common.serialization.StringDeserializer", 
poll_timeout_ms=>100, 
decorate_events=>false>
  Error: uncaught throw  in thread 0x301b0

Message Hub アカウントをお持ちの場合は、私の問題を簡単に再現できます。

wget https://artifacts.elastic.co/downloads/logstash/logstash-5.4.2.tar.gz
tar xvzf logstash-5.4.2.tar.gz
# create logstash.conf from above
# create kafka_jaas.conf from above
./logstash-5.4.2/bin/logstash -f logstash.conf

(最新のlogstash、現在5.6.2でも同じ問題があります)

4

1 に答える 1