問題タブ [spark-structured-streaming]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - すべてのワーカーを使用しない構造化ストリーミング ジョブ
ソースとして Apache Kafka データ ストリームに接続する Spark 2.0.2 構造化ストリーミング ジョブがあります。このジョブは、Kafka から Twitter データ (JSON) を取り込み、CoreNLP を使用して、センチメント、品詞のタグ付けなどでデータに注釈を付けますlocal[*]
。マスターとうまく連携します。ただし、スタンドアロンの Spark クラスターをセットアップすると、データの処理に使用されるワーカーは 1 つだけになります。同じ能力を持つ 2 人のワーカーがいます。
欠落しているジョブを送信するときに設定する必要があるものはありますか? コマンド--num-executors
で を設定しようとしましたが、うまくいきませんでした。spark-submit
正しい方向へのポインタを前もってありがとう。
apache-spark - Spark 構造化ストリーミングでの複数の集計
Spark Structured Streaming で複数の集計を行いたいと考えています。
このようなもの:
- 入力ファイルのストリームを (フォルダーから) 読み取る
- 集計 1 を実行します (一部の変換あり)
- 集計 2 (およびその他の変換) を実行する
これを構造化ストリーミングで実行すると、「ストリーミング データフレーム/データセットでは複数のストリーミング アグリゲーションはサポートされていません」というエラーが表示されます。
構造化ストリーミングでこのような複数の集計を行う方法はありますか?
apache-spark - Spark Structured Streaming で削除 (または更新) された行を処理するにはどうすればよいですか?
count
でアクティブに働いている人の数を知りたい"Coca-Cola"
場合は、次のクエリを使用します。
これは、バッチ モードで正常に動作します。
ただし、時間の経過とともに変化するcompany
フィールドを想定するperson
か、人々が完全に削除されると仮定するDataset
と、構造化ストリーミングでこれを機能させるにはどうすればよいので、count
正しいままですか?
AFAIK 構造化ストリーミングは、データ ソースが追加専用であると想定しています。それは、削除と更新を別々のデータ ソースとして追跡し、それらを自分でマージする必要があるということですか?
apache-spark - Spark 構造化ストリーミング
1 つのジョブから複数のストリーミング SQL クエリを Kafka ストリームで実行できるようにする方法。構造化されたストリーミングは、確実に前進できる方法ですか。たとえば、1 つのジョブでストリームに対して 10 個のクエリを実行しています。9 つのクエリのみを実行したいとします。実行ごとにストアから実行するクエリを動的に変更する方法はありますか。ストリーミング クエリ (別名連続クエリ) を実行するたびに、クエリを実行してストアから動的に選択する必要があります。
scala - kafka からの Spark 構造化ストリーミング - チェックポイントからの再開後に再度処理された最後のメッセージ
私は、Spark 2.0.2 の真新しい (そして「アルファ」とタグ付けされた) 構造化ストリーミングを使用して、kafka トピックからメッセージを読み取り、そこからいくつかの cassandra テーブルを更新しています。
また、sparkSession でチェックポイントの場所 ("spark.sql.streaming.checkpointLocation") を構成しました。これにより、ストリーミング アプリがダウンしている間に届いたメッセージを、再開するとすぐに受け取ることができます。
ただし、このチェックポイントの場所を構成して以来、再開時に、以前のバッチの最後のメッセージが失敗することなく正しく処理されていても、一貫して処理されることに気付きました。
ここで何が間違っているのか分かりますか?これは非常に一般的な使用例のようです。
より詳しい情報:
関連するログを参照してください (トピック 5876 は、前のバッチで正常に処理された最後のトピックです)。
また、ストリームを強制終了するときは、データの損失を避けるために適切に停止するようにします。
apache-spark - TypeError: 'Builder' オブジェクトは呼び出し可能ではありません Spark 構造化ストリーミング
Python spark 構造化ストリーミング
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.htmlのプログラミング ガイド [リンク] に示されている例の実行について
以下のエラーが表示されます:
TypeError: 'Builder' object is not callable
エラー :
apache-spark - Spark DataFrames がスキーマを変更しないのはなぜですか? また、それに対して何をすべきですか?
Spark 2.1 の構造化ストリーミングを使用して、コンテンツがバイナリ avro エンコードされた Kafka トピックから読み取ります。
したがって、次のように設定した後DataFrame
:
このDataFrame
( messages.printSchema()
) のスキーマを出力すると、次のようになります。
この質問は avro-decoding の問題と直交しているはずですがvalue
、メッセージの内容をDataFrame
関数Dataset[BusinessObject]
によって何らかの形で , に変換したいと仮定しましょうArray[Byte] => BusinessObject
。完全性の例として、関数は次のようになります (avro4s を使用):
もちろん、miguno がこの関連する質問で言っているようにDataFrame.map()
、で変換を適用することはできませんBusinessObject
。
それは次のように定義できます。
次に、マップを実行します。
しかし、新しいスキーマをクエリすると、次のようになります。
BusinessObject
データセットはケースクラスの製品プロパティを使用して正しい値を取得する必要があるため、それは意味がないと思います。
.schema(StructType)
リーダーで使用する Spark SQL の例をいくつか見てきましたが、 を使用しているという理由だけでなくreadStream
、そのようなフィールドで操作できるようになる前に実際に列を変換する必要があるため、それを行うことはできません。
私は、transformedMessages
データセット スキーマがStructField
ケース クラスのフィールドを持つものであることを Spark SQL エンジンに伝えたいと考えています。