問題タブ [dstream]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
814 参照

java - 複数のキー Java による Spark ストリーミングの削減

私は Spark Streaming の初心者で、単一の (K,V) ペアの例をたくさん見つけたので、この問題を処理する方法を見つけようとして行き詰まりました。Java で Spark の変換を使用する最善のアプローチを見つけるために、いくつかの助けをいただければ幸いです。

シナリオを簡単に説明すると、

目標は、時間枠内の一連の要素のエラー率を取得することです。

次の入力を考えると、

要素ごとに集計し、次に status で集計します(Element, (Number of Success, Number of Error))。この場合、変換の結果は次のようになります。

そして最後に (i1,i2) -> i1/(i1+i2) のような関数を使った比率計算です。

私が理解している限り、結果はreduceByKeyAndWindow()関数によって与えられます。たとえば、

アプリケーションの逆の流れに従って、私の質問は、

複数の値またはキー (おそらく のようなものJavaPairDStream<String, Tuple2<Integer,Integer>>) を持つ JavaPairDStream でペアを定義する方法は?

reduceFunc複数のキーを持つ特定のペアに最適なアプローチはどれですか?

最初の DStream をマップする最良の方法はどれですか (おそらく のようなものJavaDStream<Tuple2<String, String>> line = input.map(func))?

よろしくお願いいたします。

0 投票する
1 に答える
302 参照

java - Spark DStream の foreachDD 関数での RDD での同時変換

次のコードでは、Spark Web UI の Stages セクションにあるように、関数 fn1 と fn2 が順番に inRDD に適用されているようです。

この方法でストリーミング ジョブを実行すると、方法が異なります。以下の関数は、入力 Dstream で並行して実行されますか?

0 投票する
1 に答える
347 参照

scala - Apache Spark で DStream を使用して特徴抽出を使用する方法

DStream を介して Kafka から到着するデータがあります。いくつかのキーワードを取得するために特徴抽出を実行したいと考えています。

すべてのデータが到着するのを待ちたくない (潜在的に終了しない連続ストリームであることを意図しているため) ため、チャンクで抽出を実行したいと考えています。精度が少し低下しても問題ありません。

これまでのところ、次のようなものをまとめました。

しかし、私は受け取ったjava.lang.IllegalStateException: Haven't seen any document yet.- 私は物事を一緒に廃棄しようとしているだけなので驚かない. データの到着を待っていないので、データで使用しようとすると、生成されたモデルが空になる可能性があることを理解しています.

この問題に対する正しいアプローチは何でしょうか?

0 投票する
1 に答える
1851 参照

java - JavaPairDStream をテキスト ファイルにストリーミングする Spark

私は Spark ストリーミングの初心者で、出力の保存に行き詰まっています。

私の質問は、JavaPairDStream の出力をテキスト ファイルに保存するにはどうすればよいですか。テキスト ファイルは、DStream 内の要素のみでファイルごとに更新されますか?

たとえば、wordCount の例では、

を使用して次の出力を取得しますwordCounts.print()

最後の行をテキスト ファイルに書き込みたいと思います。テキスト ファイルは、wordCounts.

私は次のアプローチを試しました、

これにより、バッチごとにいくつかの意味のないファイルを含む一連のディレクトリが生成されます。

別のアプローチは、

助けていただければ幸いです。

ありがとうございました

0 投票する
0 に答える
72 参照

apache-spark - スパーク ストリーム カフカ パラレル レシーバー受信データの不均衡

カフカからストリームデータを並行して受信したいだけです。これが私のコードです:

yarn でコードを実行すると、numReceiver は 5 で、コードは機能します。しかし問題は、データを受信する受信機が 2 つだけであるということです。num-executor は 5、executor-core は 2

結果:

0 投票する
1 に答える
3566 参照

apache-spark - pyspark で変換された DStream は、pprint が呼び出されたときにエラーを返します

PySpark を介して Spark Streaming を調査しており、transform関数を使用しようとするとエラーが発生しtakeます。

ビアと結果sortByに対して正常に使用できます。DStreamtransformpprint

しかしtake、同じパターンに従って使用して試してみるpprintと:

仕事は失敗します

こちらのノートブックで完全なコードと出力を確認できます。

私は何を間違っていますか?