問題タブ [spark-streaming]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
scala - フィルタリング ロジックを使用した HBase への Spark Streaming
Spark Streaming と hbase の接続方法を理解しようとしてきましたが、うまくいきませんでした。私がやろうとしているのは、スパーク ストリームを与え、そのストリームを処理し、結果を hbase テーブルに格納することです。これまでのところ、これは私が持っているものです:
現在、上記のコードを spark-shell で実行しています。何が間違っているのかわかりません。
シェルで次のエラーが表示されます。
念のため、hbase テーブルも再確認しましたが、新しいものは何も書き込まれていません。
別の端末で nc -lk 9999 を実行して、テストのためにデータを spark-shell にフィードしています。
apache-spark - Spark ストリーミング アプリケーションの開発
だから私が取り組もうとしている問題は次のとおりです。
- 特定の頻度でメッセージを送信するデータ ソースが必要です
- 各メッセージを個別に処理する必要がある N 個のニューラル ネットワークがあります。
- すべてのニューラル ネットワークからの出力が集約され、各メッセージの N 個の出力がすべて収集された場合にのみ、メッセージが完全に処理されたと宣言されます。
- 最後に、メッセージが完全に処理されるまでにかかった時間を測定する必要があります (メッセージが送信されてから、そのメッセージからの N 個のニューラル ネットワーク出力がすべて収集されるまでの時間)。
スパーク ストリーミングを使用して、このようなタスクにどのようにアプローチするのか興味があります。
私の現在の実装では、3 種類のコンポーネントを使用しています。1 つはカスタム レシーバー、もう 1 つは Function を実装する 2 つのクラスで、1 つはニューラル ネット用、もう 1 つはエンド アグリゲーター用です。
大まかに言えば、私のアプリケーションは次のように構築されています。
ただし、これに関する主な問題は、4 ノード クラスターにサブミットされた場合よりもローカル モードの方が高速に実行されることです。
私の実装はそもそも間違っていますか、それともここで何か他のことが起こっていますか?
ここにも完全な投稿がありますhttp://apache-spark-user-list.1001560.n3.nabble.com/Developing-a-spark-streaming-application-td12893.html 3つのそれぞれの実装に関する詳細が記載されています前述のコンポーネント。
intellij-idea - spark-1.0 での kafka コンシューマーの実装
spark 1.0 の spark ストリーミングで kafka consumer を実装する必要があります。私はカフカプロデューサーを書きました。カフカからメッセージをプルするためのスパークレシーバーの書き方について、誰か助けてもらえますか? また、Intellij IDEA で kafka spark ストリーミング プロジェクトを実行する方法を教えてください。
scala - Spark Streaming でカスタム レシーバーがワーカーを失速させる
カスタム レシーバーを使用して Spark ストリーミング アプリケーションを作成しようとしています。事前定義された間隔でランダムな値を提供することにより、リアルタイムの入力データをシミュレートすることになっています。(単純化された) レシーバーは次のようになります。Spark Streaming アプリのコードは次のとおりです。
このコードを実行すると、レシーバーが機能していることがわかります (アイテムの保存、単一のログ エントリの受信)。ただし、saveAsTextFiles
値を出力することはありません。
マスターを 2 つのスレッドで実行するように変更することで問題を回避できます ( local[2]
) が、レシーバーの別のインスタンスを登録すると (これを行う予定です)、再び表示されます。より具体的には、出力を取得するには、登録されているカスタム レシーバーの数よりも多くのスレッドが少なくとも 1 つ必要です。
ワーカースレッドがレシーバーによって停止されているように思えます。
誰でもこの効果を説明できますか?おそらく私のコードを修正する方法はありますか?
scala - 2 つのデータセットを結合してフィルターを適用すると、「エラー: コンストラクターを期待される型にインスタンス化できません」が発生するのはなぜですか?
私は 2 つのデータセットに参加しています。1 つ目はストリームからのもので、2 つ目は HDFS のものです。
スパークでスカラを使用しています。2 つのデータセットを結合した後、結合したデータセットにフィルターを適用する必要がありますが、ここで問題に直面しています。解決にご協力ください。
以下のコードを使用していますが、
フィルターを使用すると、次のエラーが発生します
apache-spark - DStreams で SparkSQL を実行すると、org.apache.spark.rdd.ShuffledRDDPartition で ClassCastException が発生するのはなぜですか?
DStream の各 RDD で SparkSQL を実行すると、ClassCastException が発生します。
amazon-ec2 - Spark Streaming アプリは、既にストリーミングされたファイルをストリーミングします
1 つの名前ノードと 2 つのデータ ノードを持つ YARN ec2 クラスターにデプロイされた Spark ストリーミング アプリがあります。それぞれ 1 つのコアと 588 MB の RAM を備えた 11 のエグゼキューターでアプリを提出します。アプリは、常に書き込まれている S3 のディレクトリからストリーミングします。これは、それを実現するコード行です。
textFileStream の代わりに fileStream を使用する目的は、プロセスの開始時に Spark が既存のファイルを処理する方法をカスタマイズすることです。プロセスの起動後に追加された新しいファイルのみを処理し、既存のファイルを省略したいと考えています。10 秒のバッチ期間を構成しました。
s3 に少数のファイル (4 つまたは 5 つとしましょう) を追加している間、プロセスはうまくいきます。ストリーミング UI で、処理されるファイルごとに 1 つずつ、エグゼキューターでステージが正常に実行される様子を確認できます。しかし、より多くのファイルを追加しようとすると、奇妙な動作に直面することがあります。アプリケーションは、既にストリーミングされたファイルのストリーミングを開始します。
たとえば、s3 に 20 個のファイルを追加します。ファイルは 3 つのバッチで処理されます。最初のバッチは 7 個のファイルを処理し、2 回目は 8 個、3 回目は 5 個のファイルを処理します。この時点で S3 に追加されるファイルはありませんが、Spark は同じファイルを使用してこれらのフェーズを際限なく繰り返します。 これを引き起こしている可能性のある考えはありますか?
この問題の Jira チケットを投稿しました: https://issues.apache.org/jira/browse/SPARK-3553
scala - Apache Spark で人気のある Twitter のタグ
私はApache Sparkが初めてです。https://github.com/prabeesh/SparkTwitterAnalysis/tree/0.2.0の例を実行しようとしましたが、コンソールで次のエラーが表示されます:
を使用してサーバーを起動し、nc -lk 9999
sbt/sbt パッケージでコードをコンパイルし、/
を使用してコードを実行しました。sbt
sbt 'run spark://localhost:9999 <keys as specifies> hashtag'
このエラーの理由と解決方法
前もって感謝します。