問題タブ [spark-structured-streaming]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - 構造化ストリーミングでトランスフォームが副作用 (println) を 1 回だけ実行するのはなぜですか?
select
ステートメントがバッチごとに印刷されるのはなぜhello world
ですか?
scala - Kafka Direct Stream で Spark 構造化ストリーミングを使用するには?
Structured Streaming with Sparkに出会いました。これには、S3 バケットから継続的に消費し、処理された結果を MySQL DB に書き込む例があります。
これをSpark Kafka Streamingでどのように使用できますか?
を使用せずにこれら 2 つの例を組み合わせる方法はありstream.foreachRDD(rdd => {})
ますか?
apache-spark - ストリーミング データセットを Spark のバッチ データセットに追加する
データベースから Spark に履歴データをロードし、Spark に新しいストリーミング データを追加し続けたいという Spark のユース ケースがあり、その後、最新のデータセット全体で分析を行うことができます。
私の知る限り、Spark SQL も Spark Streaming も、履歴データをストリーミング データと組み合わせることができません。次に、この問題のために構築されていると思われるSpark 2.0のStructured Streamingを見つけました。しかし、いくつかの実験の後、私はまだそれを理解することができません. ここに私のコードがあります:
「org.apache.spark.sql.AnalysisException: Union between streaming and batch DataFrames/Datasets is not supported;」というエラーが表示されました。2 つのデータセットを union() すると。
誰でも私を助けてもらえますか?私は間違った方向に進んでいますか?
scala - Spark 構造化ストリーミング MemoryStream レポート カスタム シンクの使用時にデータが選択されていません
スパーク構造ストリーミングを使用する簡単なテスト ケースを作成しようとしています。コードはgithubのholdenkに触発されています。
これがCustomSinkコードです
MemoryStream を使用してテストケースで実行しようとしています
行がない場合はエラーを報告しますinput.addData("init")
init
ラインを追加するとシンクに届かないinput.addData("init")
行のコメントを外すと、エラーを報告せずにテスト ケースを正常に実行できますinput.addData("init")
。
しかし、値init
はシンクに届きません。値のみhi hi
が表示されます。
なぜ、どうすれば解決できますか?
postgresql - sSpark 構造化ストリーミング PostgreSQL updatestatebykey
INPUT PostgreSQL テーブルの変更によってトリガーされる Spark 構造化ストリーミング計算によってOUTPUT TABLEの状態を更新する方法は?
実際のシナリオの USERS テーブルは によって更新されましたuser_id = 0002
。そのユーザーのみに対して Spark 計算をトリガーし、結果を別のテーブルに書き込み/更新するにはどうすればよいですか?
apache-spark - Spark - Firehose を使用してパーティション分割されたフォルダーから JSON を読み取る
Kinesis firehose は、ファイル (この場合は時系列の JSON) の永続性を、YYYY/MM/DD/HH (24 の番号付けで時間まで) で分割されたフォルダー階層に管理します...素晴らしい.
Spark 2.0 を使用して、これらのネストされたサブフォルダーを読み取り、すべてのリーフ json ファイルから静的データフレームを作成するにはどうすればよいですか? データフレームリーダーに「オプション」はありますか?
私の次の目標は、これをストリーミング DF にすることです。Firehose によって s3 に保存された新しいファイルは、Spark 2.0 の新しい構造化ストリーミングを使用して、ストリーミング データフレームの一部になります。これはすべて実験的なものであることは承知しています。誰かが以前に S3 をストリーミング ファイル ソースとして使用したことがあり、データが上記のようにフォルダーに分割されていることを願っています。もちろん、Kinesis ストリームのストレートを好むでしょうが、このコネクタには 2.0 の日付がないため、Firehose->S3 が暫定的なものです。
ND: S3 を DBFS にマウントするデータブリックを使用していますが、もちろん EMR や他の Spark プロバイダーでも簡単に使用できます。例を示す共有可能なノートブックがある場合は、ノートブックも参照してください。
乾杯!
xml - Spark ストリーミング xml ファイル
S3 フォルダーにストリーミングされた xml ファイルを処理する必要があります。現在、以下のように実装しています。
まず、Spark の fileStream を使用してファイルを読み取ります
val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())
RDDごとに、ファイルが読み取られたかどうかを確認します
文字列を新しい HDFS ディレクトリに書き込みます
上記の HDFS ディレクトリから読み取る Dataframe を作成します。
Dataframe で何らかの処理を行い、JSON として保存します
どういうわけか、上記のアプローチは非常に非効率的であり、率直に言って非常に男子生徒的であると感じています. より良い解決策はありますか?どんな助けでも大歓迎です。
フォローアップの質問: データフレーム内のフィールド (列ではない) を操作するにはどうすればよいですか? 非常に複雑なネストされた xml があり、上記の方法を使用すると、9 列と 50 個の奇妙な内部構造体配列を持つデータフレームが得られます。特定のフィールド名を削除する必要があることを除けば、これで問題ありません。同じ構造を再度構築する必要があるため、データフレームを分解せずにそれを達成する方法はありますか?