0

poll()SourceTask 実装のメソッドに次のコードを含む Kafka コネクタがあります。

@Override
public List<SourceRecord> poll() throws InterruptedException 
{
    SomeType item = mQueue.take();
    List<SourceRecord> records = new ArrayList<>();
    SourceRecord[] sourceRecords = new SourceRecord[]{
        new SourceRecord(null, null, "data", null,
                         Schema.STRING_SCHEMA, "foo",
                         Schema.STRING_SCHEMA, "bar")
    };
    Collections.addAll(records, sourceRecords);

    return records;
}

コンシューマーをデータ トピックにアタッチすると、コネクタから次のメッセージが送信されます。

{"schema":{"type":"string","optional":false},"payload":"foo"}   {"schema":{"type":"string","optional":false},"payload":"bar"}

次のコマンドを使用して、メッセージをトピックに直接公開すると:

echo -e 'foo,bar' > /tmp/test_kafka.txt
cat /tmp/test_kafka.txt | kafka-console-producer.sh --broker-list kafka-host:9092 --topic data --property parse.key=true --property key.separator=,

次に、同じコンシューマーをアタッチすると、次のメッセージが表示されます。

foo bar

{"schema":...これは、受け取ったメッセージではなく、コネクタの実装からの出力として期待していたものです。

poll()メッセージの実際のキーと値にスキーマ メタ データが表示されずにメッセージが送信されるように、 の実装を変更するにはどうすればよいですか?

4

1 に答える 1