5

嵐のトポロジを書きました。私は基本的に avro スキーマのタプルをバイト配列の形式で kafka トピックに送信したいと考えています。

これは私がボルトを設定する方法です:

  builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, byte[]>())
            .fieldsGrouping(BOLT1, new Fields("key"));

そして、これは私がバイト配列に変換する方法です

Schema schema = avroObject.getSchema();

        DatumWriter<GenericRecord> writer = new GenericDatumWriter<GenericRecord>(schema);
        ByteArrayOutputStream out = new ByteArrayOutputStream();
        Encoder encoder = EncoderFactory.get().binaryEncoder(out, null);
        writer.write(ping, encoder);
        encoder.flush();
        byte[] message = out.toByteArray();
        String key = new String(message, "UTF-8");

次の方法でタプルを発行すると、kafka トピックに何も表示されません (バイトストリームを kafka に送信します):

collector.emit(tuple, new Values(Obj.hashMD5(key), message));

しかし、代わりに、バイト配列を文字列に変換してからkafkaトピックに変換すると、機能します:

以下のようなもの:

 builder.setBolt(KAFKA_AVRO_BOLT_NAME, new KafkaBolt<String, String>())
            .fieldsGrouping(BOLT1, new Fields("key"));

collector.emit(tuple, new Values(Obj.hashMD5(key), key));

私は何を間違っていますか?ストームカフカボルトを使用してバイトストリームをカフカトピックに送信するにはどうすればよいですか?

4

1 に答える 1

5

MD5 ハッシュが正しくないため、問題が発生しています。

bytearray を Java String に変換すると機能するとおっしゃっています。これは、MD5 の値が String に従って正しいためです。

collector.emit(tuple, new Values(Obj.hashMD5(key), key));

ご覧のとおり、MD5 は文字列パラメーターで計算され、MD5 に対応する文字列を送信します。すべて問題ありません!

ただし、bytearray を送信する場合は、bytearray で MD5 を計算する必要があり、結果として String ではなく bytearray になります。あなたのコード:

collector.emit(tuple, new Values(Obj.hashMD5(key), message));

MD5 はメッセージに対応していませんが、損失のある文字列としての UTF-8 のメッセージの変換された値に対応しているため、正しくありません (以下を参照)。

これは、バイト配列形式で MD5 を正しく計算するための SO に関する別の質問へのリンクです。

MD5 ハッシュを生成するにはどうすればよいですか?

これは、Java では bytearray を String に変換すると (C とは逆に) 損失が発生し、一部のバイトが Java エンコーディングの char に対応しないため、プロセスでデータが失われるためです (これらのいくつかは明らかにデータに含まれています)。

したがって、あなたの KafkaBolt は

KafkaBolt<byte[], byte[]>

カフカ ストームで bytearray と一緒に bytearray MD5 を送信するだけで十分かどうかはわかりません。そうでない場合は、バイト配列と BASE64 などの Java 文字列の間でロスレスなエンコ​​ーディングを使用する必要があります。

Java での Base64 エンコーディング

次を使用して、bytearray を base64 文字列に変換する必要があります。

KafkaBolt<String, String>

その後、通常どおりデータを送信します

collector.emit(tuple, new Values(Obj.hashMD5(keyInBase64), keyInBase64));

また、kafka からデータを取得すると、バイト配列を取得するためにデコードする必要がある base64 の文字列になることも意味します。

それが役立つことを願っています。

于 2015-03-18T09:14:57.133 に答える