1

私がこれまでに得た唯一の答えは、スキーマとトピックに同じ名前を付ける必要があるということです。これにより、それらがリンクされるはずです。しかし、次のような名前でスキーマを登録した後test_topic:

{
  "type": "record",
  "name": "test_topic",
  "namespace": "com.test",
  "doc": "My test schema",
  "fields": [
    {
      "name": "name",
      "type": "string"
    }
  ]
}

次のコマンドを実行すると、問題なく挿入されます。

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"name": "My first name"}}]}' "http://localhost/topics/test_topic"

しかし、次のコマンドも実行すると、エラーが発生せずに挿入されます(注、プロパティ名を変更しました)

curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H "Accept: application/vnd.kafka.v2+json" --data '{"records":[{"value":{"test": "My first name"}}]}' "http://localhost/topics/test_topic"

私のデータがこのトピックのスキーマと一致しないというエラー メッセージが表示されるのではないかと思いました...

私のスキーマ ID は 10 であるため、機能して登録されていることはわかっていますが、現時点ではあまり役に立ちません。

Python コード:

from confluent_kafka import Producer
import socket
import json
conf = {'bootstrap.servers': 'localhost:9092', 'client.id': socket.gethostname()}
producer = Producer(conf)

def acked(err, msg):
    if err is not None:
        print(f'Failed to deliver message: {str(msg)}, {str(err)}')
    else:
        print(f'Message produced: {str(msg)}')

    producer.produce("test_topic", key="key", value=json.dumps({"test": name}).encode('ascii') , callback=acked)

producer.poll(5)
4

1 に答える 1