問題タブ [spark-structured-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
1302 参照

apache-spark - Spark ストリーミングで Kafka を使用する (Spark 2.0)

0 投票する
1 に答える
322 参照

scala - 構造化ストリーミングでトランスフォームが副作用 (println) を 1 回だけ実行するのはなぜですか?

selectステートメントがバッチごとに印刷されるのはなぜhello worldですか?

0 投票する
2 に答える
4408 参照

scala - Kafka Direct Stream で Spark 構造化ストリーミングを使用するには?

Structured Streaming with Sparkに出会いました。これには、S3 バケットから継続的に消費し、処理された結果を MySQL DB に書き込む例があります。

これをSpark Kafka Streamingでどのように使用できますか?

を使用せずにこれら 2 つの例を組み合わせる方法はありstream.foreachRDD(rdd => {})ますか?

0 投票する
1 に答える
1563 参照

apache-spark - ストリーミング データセットを Spark のバッチ データセットに追加する

データベースから Spark に履歴データをロードし、Spark に新しいストリーミング データを追加し続けたいという Spark のユース ケースがあり、その後、最新のデータセット全体で分析を行うことができます。

私の知る限り、Spark SQL も Spark Streaming も、履歴データをストリーミング データと組み合わせることができません。次に、この問題のために構築されていると思われるSpark 2.0のStructured Streamingを見つけました。しかし、いくつかの実験の後、私はまだそれを理解することができません. ここに私のコードがあります:

「org.apache.spark.sql.AnalysisException: Union between streaming and batch DataFrames/Datasets is not supported;」というエラーが表示されました。2 つのデータセットを union() すると。

誰でも私を助けてもらえますか?私は間違った方向に進んでいますか?

0 投票する
1 に答える
923 参照

scala - Spark 構造化ストリーミング MemoryStream レポート カスタム シンクの使用時にデータが選択されていません

スパーク構造ストリーミングを使用する簡単なテスト ケースを作成しようとしています。コードはgithubのholdenkに触発されています。

これがCustomSinkコードです

MemoryStream を使用してテストケースで実行しようとしています

行がない場合はエラーを報告しますinput.addData("init")

initラインを追加するとシンクに届かないinput.addData("init")

行のコメントを外すと、エラーを報告せずにテスト ケースを正常に実行できますinput.addData("init")

しかし、値initはシンクに届きません。値のみhi hiが表示されます。

なぜ、どうすれば解決できますか?

0 投票する
1 に答える
497 参照

postgresql - sSpark 構造化ストリーミング PostgreSQL updatestatebykey

INPUT PostgreSQL テーブルの変更によってトリガーされる Spark 構造化ストリーミング計算によってOUTPUT TABLEの状態を更新する方法は?

実際のシナリオの USERS テーブルは によって更新されましたuser_id = 0002。そのユーザーのみに対して Spark 計算をトリガーし、結果を別のテーブルに書き込み/更新するにはどうすればよいですか?

0 投票する
2 に答える
5198 参照

apache-spark - Spark - Firehose を使用してパーティション分割されたフォルダーから JSON を読み取る

Kinesis firehose は、ファイル (この場合は時系列の JSON) の永続性を、YYYY/MM/DD/HH (24 の番号付けで時間まで) で分割されたフォルダー階層に管理します...素晴らしい.

Spark 2.0 を使用して、これらのネストされたサブフォルダーを読み取り、すべてのリーフ json ファイルから静的データフレームを作成するにはどうすればよいですか? データフレームリーダーに「オプション」はありますか?

私の次の目標は、これをストリーミング DF にすることです。Firehose によって s3 に保存された新しいファイルは、Spark 2.0 の新しい構造化ストリーミングを使用して、ストリーミング データフレームの一部になります。これはすべて実験的なものであることは承知しています。誰かが以前に S3 をストリーミング ファイル ソースとして使用したことがあり、データが上記のようにフォルダーに分割されていることを願っています。もちろん、Kinesis ストリームのストレートを好むでしょうが、このコネクタには 2.0 の日付がないため、Firehose->S3 が暫定的なものです。

ND: S3 を DBFS にマウントするデータブリックを使用していますが、もちろん EMR や他の Spark プロバイダーでも簡単に使用できます。例を示す共有可能なノートブックがある場合は、ノートブックも参照してください。

乾杯!

0 投票する
1 に答える
2078 参照

xml - Spark ストリーミング xml ファイル

S3 フォルダーにストリーミングされた xml ファイルを処理する必要があります。現在、以下のように実装しています。

まず、Spark の fileStream を使用してファイルを読み取ります

val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())

RDDごとに、ファイルが読み取られたかどうかを確認します

文字列を新しい HDFS ディレクトリに書き込みます

上記の HDFS ディレクトリから読み取る Dataframe を作成します。

Dataframe で何らかの処理を行い、JSON として保存します

どういうわけか、上記のアプローチは非常に非効率的であり、率直に言って非常に男子生徒的であると感じています. より良い解決策はありますか?どんな助けでも大歓迎です。

フォローアップの質問: データフレーム内のフィールド (列ではない) を操作するにはどうすればよいですか? 非常に複雑なネストされた xml があり、上記の方法を使用すると、9 列と 50 個の奇妙な内部構造体配列を持つデータフレームが得られます。特定のフィールド名を削除する必要があることを除けば、これで問題ありません。同じ構造を再度構築する必要があるため、データフレームを分解せずにそれを達成する方法はありますか?