1

kafka 7.2を使用して、プロデューサーを使用してメッセージを送信すると、メッセージを消費すると、メッセージの先頭に追加のセクションが追加されて到着することがわかります。

たとえば、単純な文字列「King Daniel」を kafka に送信する場合、バイト配列では次のようになります。

4B 69 6E 67 20 44 61 6E 69 65 6C

しかし、何らかの理由でそれを消費すると、次のようになります。

00 00 00 00 00 11 01 00 C2 C4 1E 7C 4B 69 6E 67 20 44 61 6E 69 65 6C

「………………|King Daniel」という文字列はどれ?

したがって、メッセージの先頭に 12 文字が追加されます。これは何かのヘッダーですか?元のメッセージを取得するにはどうすればよいですか?

これが私の消費者コードです:

public void start() {
initConsumer();
LOG.info("Starting kafka consumer for topic " + topic);
try {
    long offset = 0;
    while (true) {
    // create a fetch request for partition 0, current offset, and
    // fetch size of 1MB
    FetchRequest fetchRequest = new FetchRequest(topic, 0, offset, 1000000);
    ByteBufferMessageSet messages = consumer.fetch(fetchRequest);

    for (MessageAndOffset msg : messages) {
        ByteBuffer payload = msg.message().payload();
        writer.writeToFile(payload.array());
        // advance the offset after consuming each message
        offset = msg.offset();
    }
    }
} catch (Exception e) {
    LOG.error("Error occured while consuming from kafka", e);
}
}

をファイルに書き込んでいmsg.message().payload().array()て、このファイルを開くと、最初に 12 文字が追加された元のコンテンツが表示されます。

元のメッセージを正確に取得するにはどうすればよいですか?

4

1 に答える 1

2

問題は、ByteBuffer.array()メソッドがこのバッファをサポートする配列を返すことです ( http://docs.oracle.com/javase/7/docs/api/java/nio/ByteBuffer.html#array()を参照)。

ByteBuffer は、バッキング配列の一部のみを占有する場合があります。さらに、このメソッドは読み取り専用の ByteBuffer と直接の ByteBufferでは機能しません。ReadOnlyBufferException配列が読み取り専用のUnsupportedOperationException場合、またはByteBufferバッキング配列がない場合はスローされます。

次のコード スニペットを使用ByteBufferして、コンテンツを配列に読み込むことができます。

ByteBuffer payload = msg.message().payload();
byte[] contents = new byte[payload.remaining()];
payload.get(contents);
writer.writeToFile(contents);

writerただし、データを直接書き込みByteBuffer、余分なコピーを避けるように拡張する価値がある場合があります。

于 2013-11-30T11:48:46.243 に答える