次のアプリケーションは、すべてのメッセージをコンソールに出力する単純なコンシューマーです。
#!/usr/bin/env python
import confluent_kafka
consumer = confluent_kafka.Consumer({
'bootstrap.servers': 'kafka05-prod01.messagehub.services.us-south.bluemix.net:9093,kafka03-prod01.messagehub.services.us-south.bluemix.net:9093,kafka01-prod01.messagehub.services.us-south.bluemix.net:9093,kafka04-prod01.messagehub.services.us-south.bluemix.net:9093,kafka02-prod01.messagehub.services.us-south.bluemix.net:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'PLAIN',
'sasl.username': 'XXX',
'sasl.password': 'XXX',
'api.version.request': True,
'client.id': 'consumer01',
'group.id': 'group01',
})
consumer.subscribe(['logs'])
while True:
msg = consumer.poll(1)
if msg is not None and msg.error() is None:
print(msg.value().decode('utf-8'))
最初は問題なく動作します。数時間後、次のエラー メッセージが表示されます。スクリプトを再起動すると、再び正常に動作します。
^C%3|1504028772.615|失敗|consumer01#consumer-1| [thrd:sasl_ssl://kafka08-prod01.messagehub.services.us-south.bluemix.]: sasl_ssl://kafka08-prod01.messagehub.services.us-south.bluemix.net:9093/7: 初期化に失敗しましたSASL 認証: SASL ハンドシェイクはブローカーでサポートされていません (メカニズム PLAIN で必要です) %3|1504028772.615|エラー|consumer01#consumer-1| [thrd:sasl_ssl://kafka08-prod01.messagehub.services.us-south.bluemix.]: sasl_ssl://kafka08-prod01.messagehub.services.us-south.bluemix.net:9093/7: 初期化に失敗しましたSASL 認証: SASL ハンドシェイクはブローカーでサポートされていません (メカニズム PLAIN で必要です)