問題タブ [spark-avro]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - 分割されたデータに対して Spark AVRO S3 読み取りが機能しない
特定のファイルを読み取ると、次のように機能します。
しかし、日付のパーティション分割されたデータを読み取るためにフォルダーを指定すると、失敗します。
val filePath="s3n://bucket_name/f1/f2/avro/dt=2016-10-19/"
次のエラーが表示されます。
何か不足していますか?
apache-spark - sparkを使用してavroを複数の出力ディレクトリに書き込む方法
こんにちは,MultipleTextOutputFormat を使用して 1 つの Spark ジョブで複数の出力ディレクトリにテキスト データを書き込むことについてのトピックがあります。
キー Spark による複数の出力への書き込み - 1 つの Spark ジョブ
Avroデータを複数のディレクトリに書き込む同様の方法があるかどうかを尋ねます
私が欲しいのは、avroファイルのデータを別のディレクトリに書き込むことです(タイムスタンプフィールドに基づいて、タイムスタンプの同じ日が同じディレクトリに移動します)
apache-spark - スパークからAvroBigQueryInputFormatによってbqテーブルを読み取ると、予期しない動作が発生します(Javaを使用)
サンプルのスケルトン コードは次のようなものです。ここでは、基本的に BigQuery から RDD を読み取り、my_field_name 値が null であるすべてのデータ ポイントを選択しています。
ただし、出力RDDは完全に予想外のようです。特に my_field_name の値は完全にランダムに見えます。少しデバッグした後、フィルタリングは期待どおりに行われているようですが、問題は抽出した値にありますGenericData.Record
(基本的にrecord.get(my_field_name)
) 完全にランダムに見えます。
したがって、AvroBigQueryInputFormat から GsonBigQueryInputFormat に切り替えて、代わりに json で bq を読み取った後、このコードは正しく機能しているようです。
ただし、理想的には、代わりに Avro を使用したいのですが (json を処理するよりもはるかに高速である必要があります)、コードでの現在の動作は完全に邪魔です。AvroBigQueryInputFormat を間違って使用しているだけですか?
apache-spark - 実行中の Spark Streaming アプリケーションでのスキーマ変更の処理
Spark 1.6 で DataFrames API を使用して Spark Streaming アプリケーションを構築しようとしています。うさぎの穴に深く入り込む前に、誰かが DataFrames が異なるスキーマを持つデータをどのように処理するかを理解するのを手伝ってくれることを望んでいました。
アイデアは、メッセージが Avro スキーマを使用して Kafka に流れ込むというものです。ストリーミング アプリケーションを再起動しなくても、下位互換性のある方法でスキーマを進化させることができるはずです (アプリケーション ロジックは引き続き機能します)。
KafkaUtils を使用してダイレクト ストリームを作成し、AvroKafkaDecoder (Confluent から) を使用して、スキーマ レジストリとメッセージに埋め込まれたスキーマ ID を使用して、新しいバージョンのメッセージを逆シリアル化するのは簡単なようです。それは、DStream を持っている限り私を取得します。
問題 #1: その DStream 内に、異なるバージョンのスキーマを持つオブジェクトが存在します。したがって、それぞれを Row オブジェクトに変換するときは、データを適切に移行するために最新のリーダー スキーマを渡す必要があり、最新のスキーマを sqlContext.createDataFrame(rowRdd, schema) 呼び出しに渡す必要があります。DStream 内のオブジェクトは GenericData.Record 型であり、私が知る限り、どれが最新バージョンかを簡単に判断する方法はありません。考えられる解決策は 2 つあります。1 つは、スキーマ レジストリを呼び出して、すべてのマイクロバッチでスキーマの最新バージョンを取得することです。もう 1 つは、スキーマ ID をアタッチするようにデコーダーを変更することです。次に、rdd を反復処理して最大の ID を見つけ、ローカル キャッシュからスキーマを取得します。
誰かがすでにこれを再利用可能な方法でうまく解決していることを願っていました。
問題/質問 #2: Spark は、パーティションごとに Kafka からプルする異なるエグゼキュータを持つ予定です。あるエグゼキュータが他のエグゼキュータとは異なる「最新」のスキーマを受け取ると、アプリケーションはどうなりますか。あるエグゼキューターによって作成された DataFrame は、同じ時間枠に対して別のエグゼキューターとは異なるスキーマを持ちます。これが本当の問題かどうかは実際にはわかりません。データの流れと、どのような操作で問題が発生するのかを視覚化するのに苦労しています。それが問題である場合、executor 間でデータを共有する必要があることを意味し、それは複雑で非効率的に聞こえます。
これについて心配する必要がありますか?その場合、スキーマの違いを解決するにはどうすればよいですか?
ありがとう、 -- ベン
spark-streaming - バイト[]をscalaの文字列に変換できません
** kafka からデータをストリーミングして、データ フレームに変換しようとしています。このリンクをたどった
しかし、プロデューサー アプリケーションとコンシューマー アプリケーションの両方を実行している場合、これがコンソールの出力です。**
(0,[B@370ed56a) (1,[B@2edd3e63) (2,[B@3ba2944d) (3,[B@2eb669d1) (4,[B@49dd304c) (5,[B@4f6af565) (6 ,[B@7714e29e)
これは文字通り kafka プロデューサーの出力であり、トピックはメッセージをプッシュする前は空です。
プロデューサーのコード スニペットは次のとおりです。
そしてその出力は次のとおりです。
キー=0、値=[B@680387a キー=1、値=[B@32bfb588 キー=2、値=[B@2ac2e1b1 キー=3、値=[B@606f4165 キー=4、値=[B@282e7f59
これは、scala で記述された私のコンシューマー コード スニペットです。
createStream() で StringDecoder と DefaultDecoder の両方を試してみました。プロデューサーとコンシューマーは互いに準拠していると確信しています。誰からの助けはありますか?
hadoop - 寄木細工のファイルを Avro ファイルに変換する方法は?
Hadoop とビッグデータ テクノロジは初めてです。parquet ファイルを avro ファイルに変換し、そのデータを読み取るのが好きです。いくつかのフォーラムを検索したところ、AvroParquetReader の使用が提案されました。
しかし、AvroParquetReader を含める方法がわかりません。まったくインポートできません。
spark-shell を使用してこのファイルを読み取ることができ、それを何らかの JSON に変換し、その JSON を avro に変換できます。しかし、私はより簡単な解決策を探しています。