問題タブ [dstream]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - Spark Streaming で複数のバッチ間隔でデータ ストリームを運ぶ方法
Apache Spark Streaming 1.6.1 を使用して、2 つのキー/値データ ストリームを結合し、出力を HDFS に書き込む Java アプリケーションを作成しています。2 つのデータ ストリームには K/V 文字列が含まれており、textFileStream() を使用して HDFS から Spark に定期的に取り込まれます。
2 つのデータ ストリームは同期されていません。つまり、時刻 t0 でストリーム 1 にあった一部のキーが、時刻 t1 でストリーム 2 に表示されるか、またはその逆になる可能性があります。したがって、私の目標は、2 つのストリームを結合し、「残りの」キーを計算することです。これは、次のバッチ間隔での結合操作で考慮する必要があります。
これをより明確にするために、次のアルゴリズムを見てください。
このアルゴリズムを Spark Streaming で実装しようとしましたが、うまくいきませんでした。最初に、この方法で残りのキーに対して 2 つの空のストリームを作成します (これは 1 つのストリームにすぎませんが、2 つ目のストリームを生成するコードは似ています)。
後で、この空のストリームはストリーム 1 と統合 (つまり、union()) され、最後に結合後、ストリーム 1 の残りのキーを追加し、window() を呼び出します。stream2 でも同じことが起こります。
問題は、left_keys_s1 と left_keys_s2 を生成する操作がアクションのない変換であることです。これは、Spark が RDD フロー グラフを作成しないため、決して実行されないことを意味します。私が今得ているのは、キーが同じ時間間隔で stream1 と stream2 にあるレコードのみを出力する結合です。
これをSparkで正しく実装するための提案はありますか?
ありがとう、マルコ
apache-spark - Pyspark を使用して 2 つの Dstream を結合する方法 (通常の RDD の .zip に似ています)
pyspark で以下のように (R の cbind のように) 2 つの RDD を組み合わせることができることを私は知っています:
pyspark の 2 つの Dstream に対して同じことを実行したいと考えています。それは可能ですか、それとも代替手段ですか?
実際、MLlib ランダムフォレスト モデルを使用して、スパーク ストリーミングを使用して予測しています。最後に、機能 Dstream と予測 Dstream を組み合わせて、さらに下流の処理を行いたいと考えています。
前もって感謝します。
-オベイド
apache-spark - Spark Streaming: 全期間の平均
私は、温度値を受け取り、すべての時間の平均温度を計算する Spark Streaming アプリケーションを作成しました。そのために、JavaPairDStream.updateStateByKey
トランザクションを使用してデバイスごとに計算しました(ペアのキーで区切られています)。状態の追跡には、すべての温度値を double として保持し、メソッドStatCounter
を呼び出して各ストリームの平均を再計算するクラスを使用します。StatCounter.mean
ここに私のプログラム:
コード全体を編集: StatCounter を使用するようになりました
これはうまくいくようです。しかし今、質問に:
ここですべての時間の平均を計算する方法も示すオンラインの例を見つけました: https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter1/total.html
AtmoicLongs
「ステートフルな値」を格納するために etc を使用し、forEachRDD
メソッドでそれらを更新します。
私の質問は次のとおりです。Spark ストリーミングですべての時間をステートフルに計算するためのより良いソリューションは何ですか? いずれかの方法を使用することの利点/欠点はありますか? ありがとうございました!
hbase - Spark Streaming を介した HBase からのデータの読み取り
したがって、私のプロジェクト フローは Kafka -> Spark Streaming -> HBase です。
ここで、前のジョブで作成されたテーブルを調べ、集計を行い、別のテーブルに別の列形式で格納する HBase からデータを再度読み取りたいと考えています。
Kafka -> Spark Streaming (2ms) -> HBase -> Spark Streaming (10ms) -> HBase
Spark Streaming を使用して HBase からデータを読み取る方法がわかりません。SparkOnHbase( http://blog.cloudera.com/blog/2014/12/new-in-cloudera-labs-sparkonhbase/ ) ライブラリである Cloudera Lab Project を見つけましたが、取得方法がわかりません。 HBase からのストリーム処理用の inputDStream。これを行うのに役立つポインタまたはライブラリ リンクがある場合は、それを提供してください。
python - pyspark を使用して json dstream からデータフレームを作成できません
dstream で json からデータフレームを作成しようとしていますが、以下のコードはデータフレームを正しく取得していないようです -
エラーはありませんが、スクリプトを実行すると、ストリーミング コンテキストから json が正常に読み取られますが、値が要約またはデータフレーム スキーマに出力されません。
私が読もうとしているjsonの例 -
{"reviewerID": "A2IBPI20UZIR0U", "asin": "1384719342", "reviewerName": "cassandra tu \"うん、まあ、それはあなたのようだ...", "helpful": [0, 0], "reviewText": "ここで書くことはあまりありませんが、本来あるべきことを正確に行います。ポップ サウンドを除外します。今、私の録音ははるかに鮮明です。Amazon で最も低価格のポップ フィルターの 1 つです。 reviewTime": "2014 年 2 月 28 日"}
私はスパーク ストリーミングの初心者で、ドキュメントを読んでペット プロジェクトに取り組み始めました。ヘルプとガイダンスは大歓迎です。
scala - Spark JSON DStream Print() / saveAsTextFiles が機能しない
Spark-shell ローカル モードで実行するコード:
これは機能しません-出力が表示されません-スパークシェルでKafkaからストリームを読み取ることで同じことを試みました
以下も試してみました -- 動作しません:
また、データフレームに変換するスキーマを解析しようとしましたが、何も機能していないようです
通常の json 解析が機能しており、通常の //RDD/DF の内容を //Spark-shell のコンソールに出力できます
誰か助けてくれませんか?
apache-spark - Scalaで2つのDStream間で圧縮を実行するには?
RDD での通常の圧縮のように圧縮したい 2 つのウィンドウ化された dstream があります。
注 : 主な目的は、これを計算するためのより良い方法がある場合に備えて、ウィンドウ dstream の平均と stdv を計算することです。
scala - DStream のソートと topN の取得
Spark Scala にいくつかの DStream があり、それを並べ替えて上位 N を取りたいと考えています。問題は、実行しようとするたびに次NotSerializableException
の例外メッセージが表示されることです。
これは、DStream オブジェクトがクロージャ内から参照されているためです。
問題は、それを解決する方法がわからないことです:
これが私の試みです:
DStream をソートして上位 N を取得する他の方法は気にしません。