0
// Asynchronous response from Message Hub / Kafka.
kafkaProducer.send(record,
new Callback() {
   public void onCompletion(RecordMetadata m, Exception e) {
       if(e != null) {
       e.printStackTrace();
       } else { 
    log.debug(" **** Message sent, offset: " + m.offset() + 
    " @ partition " + m.partition());
    log.debug(" <<<< " +
    " document_id " + key +
    " @ " + account.getActivityId());
    }
   }
});

上記のコードを使用してメッセージ ハブにメッセージを公開しようとすると、常に次のエラーが発生します。

2016-06-21 18:38:22-[INFO] com.ibm.cloudant.streaming.messageHub.Client.send(476): >>>> document_id julia30 @ my_database 2016-06-21 18:38:22 を送信中-[デバッグ] org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(623): メタデータ要求を送信するためにノード -1 への接続を初期化します 2016-06-21 18:38:22-[デバッグ] org.apache.kafka .clients.NetworkClient.initiateConnect(487): kafka01-prod01.messagehub.services.us-south.bluemix.net:9093 でノード -1 への接続を開始しています。2016-06-21 18:38:22-[デバッグ] org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$1.run(105): SaslClient の作成: client=multiuser-adapter@multiuser.messagehub.ibm.com ;service=kafka;serviceHostname=kafka01-prod01.messagehub.services.us-south.bluemix.net;mechs=[GSSAPI] 2016-06-21 18:38:22-[DEBUG] com.

プロデューサーの設定は次のようになります。

```

 compression.type = none
 metric.reporters = []
 metadata.max.age.ms = 300000
 metadata.fetch.timeout.ms = 60000
 reconnect.backoff.ms = 50
 sasl.kerberos.ticket.renew.window.factor = 0.8
 bootstrap.servers = [kafka01-prod01.messagehub.services.us-south.bluemix.net:9093]
 retry.backoff.ms = 100
 sasl.kerberos.kinit.cmd = /usr/bin/kinit
 buffer.memory = 33554432
 timeout.ms = 30000
 key.serializer = class org.apache.kafka.common.serialization.StringSerializer
 sasl.kerberos.service.name = null
 sasl.kerberos.ticket.renew.jitter = 0.05
 ssl.keystore.type = JKS
 ssl.trustmanager.algorithm = PKIX
 block.on.buffer.full = false
 ssl.key.password = null
 max.block.ms = 60000
 sasl.kerberos.min.time.before.relogin = 60000
 connections.max.idle.ms = 540000
 ssl.truststore.password = [hidden]
 max.in.flight.requests.per.connection = 5
 metrics.num.samples = 2
 client.id = kafka01-prod01.messagehub.services.us-south.bluemix.net%3A9093_8qp87X32V6PK5epv.1
 ssl.endpoint.identification.algorithm = HTTPS
 ssl.protocol = TLSv1.2
 request.timeout.ms = 30000
 ssl.provider = null
 ssl.enabled.protocols = [TLSv1.2]
 acks = -1
 batch.size = 16384
 ssl.keystore.location = null
 receive.buffer.bytes = 32768
 ssl.cipher.suites = null
 ssl.truststore.type = JKS
 security.protocol = SASL_SSL
 retries = 1
 max.request.size = 1048576
 value.serializer = class  org.apache.kafka.common.serialization.StringSerializer
 ssl.truststore.location = /Users/jiangph/tools/liberty/usr/shared/resources/keystore.jks
 ssl.keystore.password = null
 ssl.keymanager.algorithm = SunX509
 metrics.sample.window.ms = 30000
 send.buffer.bytes = 131072
 linger.ms = 0

```

以上の設定で、Message Hub REST API を使ってトピックを作成するのに問題はありません。メッセージを公開しようとすると問題が発生します。

どんな考えでも大歓迎です。

4

3 に答える 3

3

MessageHub REST クライアント API は、Java Kafka クライアントとは異なる方法で認証します。

ログに認証エラーがあることがわかりました。 javax.security.sasl.SaslException: PLAIN: authorization ID and password must be specified at com.sun.security.sasl.PlainClient.

MessageHub SASL 認証用に Java クライアントを構成するには、次の Java の例を参照してください。

https://github.com/ibm-messaging/message-hub-samples/tree/master/java/message-hub-kafka-ssl

プロデューサ プロパティには次のものが含まれている必要があることに注意してください。

https://github.com/ibm-messaging/message-hub-samples/blob/master/java/message-hub-kafka-ssl/resources/producer.properties

jaas.conf ファイルは次のようになります。

KafkaClient { com.ibm.messagehub.login.MessageHubLoginModule required serviceName="kafka" username="your-username" password="your-password"; }; また、クラスパスに MessageHub ログイン jar が必要です。

HTH、江戸

于 2016-06-21T15:00:51.657 に答える
1

StringSerializer を使用してコンシューマー サンプルとプロデューサー サンプルの両方を実行でき、切り替える必要はありませんでした。シリアライザーは SASL 認証には適用されません。

于 2016-06-22T09:29:49.743 に答える
1

問題は認証とはまったく関係がなく、キー/値のシリアル化に関係していたことが判明しました。からの変更

    props.setProperty("key.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer");
    props.setProperty("value.serializer", 
            "org.apache.kafka.common.serialization.StringSerializer")

    props.setProperty("key.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");
    props.setProperty("value.serializer", "org.apache.kafka.common.serialization.ByteArraySerializer");

javax.security.sasl.SaslException を解決しました

奇妙なことに、ByteArraySerialization を正常に使用した後は、StringSerialization に戻すこともでき、引き続き機能します。

于 2016-06-22T08:07:44.057 に答える