9

Kafka、シリアライゼーション、JSON は初めてです

私が望むのは、プロデューサーが kafka を介して JSON ファイルを送信し、コンシューマーが JSON ファイルを元のファイル形式で消費して操作することです。

私はそれを得ることができたので、JSONは文字列へのコンバーターであり、文字列シリアライザーを介して送信され、消費者は文字列を解析してJSONオブジェクトを再作成しますが、これは効率的ではないか、正しい方法ではないのではないかと心配しています(フィールドタイプが失われる可能性があります) JSON の場合)

そこで、JSON シリアライザーを作成し、プロデューサーの構成でそれを設定することを検討しました。

ここで JsonEncoder を使用しました: Kafka: カスタムシリアライザーの作成

しかし、今プロデューサーを実行しようとすると、エンコーダーの toBytes 関数で、try ブロックが私が望むようなものを決して返さないようです

try {
            bytes = objectMapper.writeValueAsString(object).getBytes();

        } catch (JsonProcessingException e) {
            logger.error(String.format("Json processing failed for object: %s", object.getClass().getName()), e);
        }

らしいobjectMapper.writeValueAsString(object).getBytes()。JSON obj ( {"name":"Kate","age":25}) を受け取り、何も変換しません。

これは私のプロデューサーの実行関数です

List<KeyedMessage<String,JSONObject>> msgList=new ArrayList<KeyedMessage<String,JSONObject>>();   

    JSONObject record = new JSONObject();

    record.put("name", "Kate");
    record.put("age", 25);

    msgList.add(new KeyedMessage<String, JSONObject>(topic, record));

    producer.send(msgList);

私は何が欠けていますか?元の方法 (文字列に変換して送信し、JSON obj を再構築する) は問題ありませんか? それとも正しい方法ではありませんか?

ありがとう!

4

2 に答える 2

6

うーん、シリアル化/逆シリアル化の手順でデータが失われるのではないかと恐れているのはなぜですか?

選択肢の 1 つは、無料のオープン ソース ソフトウェアであるConfluent の Schema Registryに含まれている Kafka JSON シリアライザを使用することです (免責事項: 私は Confluent で働いています)。そのテスト スイートは、開始するためのいくつかの例を提供します。詳細については、serializers and formattersで説明されています。この JSON シリアライザーとスキーマ レジストリ自体の利点は、Kafka のプロデューサー クライアントとコンシューマー クライアントとの透過的な統合を提供することです。JSON とは別に、必要に応じて Apache Avro もサポートされています。

私見では、このセットアップは、JSON で Kafka と対話する際の開発者の利便性と使いやすさの点で最良のオプションの 1 つですが、もちろん YMMV です!

于 2015-10-06T06:50:05.493 に答える
1

JSON であるイベント文字列を次のようなバイト配列に変換することをお勧めします。

byte[] eventBody = event.getBody();

これによりパフォーマンスが向上し、Kafka Consumer は JSON を取り戻すのに役立つ JSON パーサーも提供します。
さらに情報が必要な場合はお知らせください。

于 2015-10-05T11:32:19.613 に答える