1

https://github.com/aio-libs/aiokafka/blob/master/examples/ssl_consume_produce.pyのリポジトリにあるサンプルConfluent Cloudの修正版を使用して、Kafka クラスターに接続しようとしています。私は正しいパラメータであると信じているもので自分のとを設定しましたが、以下のランタイムエラーが発生しています:AIOKafka ssl_consume_produce.pyAIOKafkaAIOKafkaConsumerAIOKafkaProducer

/Users/galen/opt/anaconda3/envs/ds/bin/python /Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py
Traceback (most recent call last):
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 57, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 52, in <module>
    loop.run_until_complete(task)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/asyncio/base_events.py", line 579, in run_until_complete
    return future.result()
  File "/Volumes/ThunderBlade/github/kafka/aiokafka/examples/ssl_consume_produce.py", line 23, in produce_and_consume
    await producer.start()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/producer/producer.py", line 171, in start
    await self.client.bootstrap()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/client.py", line 203, in bootstrap
    version_hint=version_hint)
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 90, in create_conn
    await conn.connect()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 214, in connect
    await self._do_sasl_handshake()
  File "/Users/galen/opt/anaconda3/envs/ds/lib/python3.7/site-packages/aiokafka/conn.py", line 281, in _do_sasl_handshake
    payload, expect_response = res
RuntimeError: await wasn't used with future
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7f9bc818d350>

コードの私の適応バージョンは次のとおりです。

import asyncio
from ssl import create_default_context, Purpose
from aiokafka import AIOKafkaProducer, AIOKafkaConsumer
from kafka.common import TopicPartition

import ccloud_lib

ssl_context = create_default_context(Purpose.SERVER_AUTH, cafile='cacert.pem')
conf = ccloud_lib.read_ccloud_config('kafka_config.conf')

async def produce_and_consume(loop):
    # Produce
    producer = AIOKafkaProducer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop = loop,
        security_protocol='SASL_SSL',
        ssl_context=ssl_context,
        sasl_mechanism='PLAIN',
        sasl_plain_password=conf['sasl.password'],
        sasl_plain_username=conf['sasl.username']
    )

    await producer.start()
    try:
        msg = await producer.send_and_wait(
            'my_topic', b"Super Message", partition=0)
    finally:
        await producer.stop()

    consumer = AIOKafkaConsumer(
        bootstrap_servers=conf['bootstrap.servers'],
        loop=loop,
        ssl_context=ssl_context,
        security_protocol='SASL_SSL',
        sasl_mechanism='PLAIN',
        sasl_plain_password=conf['sasl.password'],
        sasl_plain_username=conf['sasl.username']
    )
    await consumer.start()
    try:
        consumer.seek(TopicPartition('my_topic', 0), msg.offset)
        fetch_msg = await consumer.getone()
    finally:
        await consumer.stop()

    print("Success", msg, fetch_msg)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    task = loop.create_task(produce_and_consume(loop))
    try:
        loop.run_until_complete(task)
    finally:
        loop.run_until_complete(asyncio.sleep(0, loop=loop))
        task.cancel()
        try:
            loop.run_until_complete(task)
        except asyncio.CancelledError:
            pass

私の難読化された構成confは次のようになります。

bootstrap.servers=*****.us-central1.gcp.confluent.cloud:9092
ssl.endpoint.identification.algorithm=https
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username\="********************" password\="****************************************";
sasl.username=********************
sasl.password=********************************************************************************
basic.auth.credentials.source=USER_INFO
schema.registry.basic.auth.user.info=********************:********************
schema.registry.url=https://********************.us-central1.gcp.confluent.cloud

Confluent CloudAIOKafka クライアントを使用して接続することは可能ですか? 私の構成に間違っているものはありますか?

4

0 に答える 0