kafka 7.2を使用して、プロデューサーを使用してメッセージを送信すると、メッセージを消費すると、メッセージの先頭に追加のセクションが追加されて到着することがわかります。
たとえば、単純な文字列「King Daniel」を kafka に送信する場合、バイト配列では次のようになります。
4B 69 6E 67 20 44 61 6E 69 65 6C
しかし、何らかの理由でそれを消費すると、次のようになります。
00 00 00 00 00 11 01 00 C2 C4 1E 7C 4B 69 6E 67 20 44 61 6E 69 65 6C
「………………|King Daniel」という文字列はどれ?
したがって、メッセージの先頭に 12 文字が追加されます。これは何かのヘッダーですか?元のメッセージを取得するにはどうすればよいですか?
これが私の消費者コードです:
public void start() {
initConsumer();
LOG.info("Starting kafka consumer for topic " + topic);
try {
long offset = 0;
while (true) {
// create a fetch request for partition 0, current offset, and
// fetch size of 1MB
FetchRequest fetchRequest = new FetchRequest(topic, 0, offset, 1000000);
ByteBufferMessageSet messages = consumer.fetch(fetchRequest);
for (MessageAndOffset msg : messages) {
ByteBuffer payload = msg.message().payload();
writer.writeToFile(payload.array());
// advance the offset after consuming each message
offset = msg.offset();
}
}
} catch (Exception e) {
LOG.error("Error occured while consuming from kafka", e);
}
}
をファイルに書き込んでいmsg.message().payload().array()
て、このファイルを開くと、最初に 12 文字が追加された元のコンテンツが表示されます。
元のメッセージを正確に取得するにはどうすればよいですか?