問題タブ [spark-structured-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
363 参照

apache-spark - すべてのワーカーを使用しない構造化ストリーミング ジョブ

ソースとして Apache Kafka データ ストリームに接続する Spark 2.0.2 構造化ストリーミング ジョブがあります。このジョブは、Kafka から Twitter データ (JSON) を取り込み、CoreNLP を使用して、センチメント、品詞のタグ付けなどでデータに注釈を付けますlocal[*]。マスターとうまく連携します。ただし、スタンドアロンの Spark クラスターをセットアップすると、データの処理に使用されるワーカーは 1 つだけになります。同じ能力を持つ 2 人のワーカーがいます。

欠落しているジョブを送信するときに設定する必要があるものはありますか? コマンド--num-executorsで を設定しようとしましたが、うまくいきませんでした。spark-submit

正しい方向へのポインタを前もってありがとう。

0 投票する
8 に答える
12372 参照

apache-spark - Spark 構造化ストリーミングでの複数の集計

Spark Structured Streaming で複数の集計を行いたいと考えています。

このようなもの:

  • 入力ファイルのストリームを (フォルダーから) 読み取る
  • 集計 1 を実行します (一部の変換あり)
  • 集計 2 (およびその他の変換) を実行する

これを構造化ストリーミングで実行すると、「ストリーミング データフレーム/データセットでは複数のストリーミング アグリゲーションはサポートされていません」というエラーが表示されます。

構造化ストリーミングでこのような複数の集計を行う方法はありますか?

0 投票する
1 に答える
523 参照

apache-spark - Spark Structured Streaming で削除 (または更新) された行を処理するにはどうすればよいですか?

countでアクティブに働いている人の数を知りたい"Coca-Cola"場合は、次のクエリを使用します。

これは、バッチ モードで正常に動作します。

ただし、時間の経過とともに変化するcompanyフィールドを想定するpersonか、人々が完全に削除されると仮定するDatasetと、構造化ストリーミングでこれを機能させるにはどうすればよいので、count正しいままですか?

AFAIK 構造化ストリーミングは、データ ソースが追加専用であると想定しています。それは、削除と更新を別々のデータ ソースとして追跡し、それらを自分でマージする必要があるということですか?

0 投票する
1 に答える
788 参照

apache-spark - Spark 構造化ストリーミング

1 つのジョブから複数のストリーミング SQL クエリを Kafka ストリームで実行できるようにする方法。構造化されたストリーミングは、確実に前進できる方法ですか。たとえば、1 つのジョブでストリームに対して 10 個のクエリを実行しています。9 つのクエリのみを実行したいとします。実行ごとにストアから実行するクエリを動的に変更する方法はありますか。ストリーミング クエリ (別名連続クエリ) を実行するたびに、クエリを実行してストアから動的に選択する必要があります。

0 投票する
1 に答える
1219 参照

scala - kafka からの Spark 構造化ストリーミング - チェックポイントからの再開後に再度処理された最後のメッセージ

私は、Spark 2.0.2 の真新しい (そして「アルファ」とタグ付けされた) 構造化ストリーミングを使用して、kafka トピックからメッセージを読み取り、そこからいくつかの cassandra テーブルを更新しています。

また、sparkSession でチェックポイントの場所 ("spark.sql.streaming.checkpointLocation") を構成しました。これにより、ストリーミング アプリがダウンしている間に届いたメッセージを、再開するとすぐに受け取ることができます。

ただし、このチェックポイントの場所を構成して以来、再開時に、以前のバッチの最後のメッセージが失敗することなく正しく処理されていても、一貫して処理されることに気付きました。

ここで何が間違っているのか分かりますか?これは非常に一般的な使用例のようです。

より詳しい情報:

関連するログを参照してください (トピック 5876 は、前のバッチで正常に処理された最後のトピックです)。

また、ストリームを強制終了するときは、データの損失を避けるために適切に停止するようにします。

0 投票する
1 に答える
6386 参照

apache-spark - TypeError: 'Builder' オブジェクトは呼び出し可能ではありません Spark 構造化ストリーミング

Python spark 構造化ストリーミング
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.htmlのプログラミング ガイド [リンク] に示されている例の実行について

以下のエラーが表示されます:
TypeError: 'Builder' object is not callable

エラー :

0 投票する
1 に答える
640 参照

apache-spark - Spark DataFrames がスキーマを変更しないのはなぜですか? また、それに対して何をすべきですか?

Spark 2.1 の構造化ストリーミングを使用して、コンテンツがバイナリ avro エンコードされた Kafka トピックから読み取ります。

したがって、次のように設定した後DataFrame:

このDataFrame( messages.printSchema()) のスキーマを出力すると、次のようになります。

この質問は avro-decoding の問題と直交しているはずですがvalue、メッセージの内容をDataFrame関数Dataset[BusinessObject]によって何らかの形で , に変換したいと仮定しましょうArray[Byte] => BusinessObject。完全性の例として、関数は次のようになります (avro4s を使用):

もちろん、miguno がこの関連する質問で言っているようにDataFrame.map()、で変換を適用することはできませんBusinessObject

それは次のように定義できます。

次に、マップを実行します。

しかし、新しいスキーマをクエリすると、次のようになります。

BusinessObjectデータセットはケースクラスの製品プロパティを使用して正しい値を取得する必要があるため、それは意味がないと思います。

.schema(StructType)リーダーで使用する Spark SQL の例をいくつか見てきましたが、 を使用しているという理由だけでなくreadStream、そのようなフィールドで操作できるようになる前に実際に列を変換する必要があるため、それを行うことはできません。

私は、transformedMessagesデータセット スキーマがStructFieldケース クラスのフィールドを持つものであることを Spark SQL エンジンに伝えたいと考えています。