Spark 1.6 で DataFrames API を使用して Spark Streaming アプリケーションを構築しようとしています。うさぎの穴に深く入り込む前に、誰かが DataFrames が異なるスキーマを持つデータをどのように処理するかを理解するのを手伝ってくれることを望んでいました。
アイデアは、メッセージが Avro スキーマを使用して Kafka に流れ込むというものです。ストリーミング アプリケーションを再起動しなくても、下位互換性のある方法でスキーマを進化させることができるはずです (アプリケーション ロジックは引き続き機能します)。
KafkaUtils を使用してダイレクト ストリームを作成し、AvroKafkaDecoder (Confluent から) を使用して、スキーマ レジストリとメッセージに埋め込まれたスキーマ ID を使用して、新しいバージョンのメッセージを逆シリアル化するのは簡単なようです。それは、DStream を持っている限り私を取得します。
問題 #1: その DStream 内に、異なるバージョンのスキーマを持つオブジェクトが存在します。したがって、それぞれを Row オブジェクトに変換するときは、データを適切に移行するために最新のリーダー スキーマを渡す必要があり、最新のスキーマを sqlContext.createDataFrame(rowRdd, schema) 呼び出しに渡す必要があります。DStream 内のオブジェクトは GenericData.Record 型であり、私が知る限り、どれが最新バージョンかを簡単に判断する方法はありません。考えられる解決策は 2 つあります。1 つは、スキーマ レジストリを呼び出して、すべてのマイクロバッチでスキーマの最新バージョンを取得することです。もう 1 つは、スキーマ ID をアタッチするようにデコーダーを変更することです。次に、rdd を反復処理して最大の ID を見つけ、ローカル キャッシュからスキーマを取得します。
誰かがすでにこれを再利用可能な方法でうまく解決していることを願っていました。
問題/質問 #2: Spark は、パーティションごとに Kafka からプルする異なるエグゼキュータを持つ予定です。あるエグゼキュータが他のエグゼキュータとは異なる「最新」のスキーマを受け取ると、アプリケーションはどうなりますか。あるエグゼキューターによって作成された DataFrame は、同じ時間枠に対して別のエグゼキューターとは異なるスキーマを持ちます。これが本当の問題かどうかは実際にはわかりません。データの流れと、どのような操作で問題が発生するのかを視覚化するのに苦労しています。それが問題である場合、executor 間でデータを共有する必要があることを意味し、それは複雑で非効率的に聞こえます。
これについて心配する必要がありますか?その場合、スキーマの違いを解決するにはどうすればよいですか?
ありがとう、 -- ベン