問題タブ [spark-streaming]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
3 に答える
22937 参照

java - StreamingContext の開始が「IllegalArgumentException: 要件が失敗しました: 出力操作が登録されていないため、実行するものがありません」で失敗するのはなぜですか?

次のように、ソースとして Twitter を使用して Spark Streaming の例を実行しようとしています。

しかし、私は次の例外を受けています

この問題を解決する方法について何か提案はありますか?

0 投票する
1 に答える
1239 参照

hadoop - Spark のステートフル操作 updateStateByKey を使用してリアルタイムを維持する方法

まず架空のユースケース。tuples のストリームがあるとしましょう(user_id, time_stamp, login_ip)。各ユーザーの最終ログイン IP を 5 秒の粒度で維持したいと考えています。

Spark ストリーミングを使用すると、updateStateByKeyメソッドを使用してこのマップを更新できます。問題は、データのストリームが継続的に発生するにつれて、より多くのデータが表示されるため、各時間間隔の RDD がますます大きくなることuser_idsです。しばらくすると、マップが非常に大きくなり、維持に時間がかかり、結果のリアルタイム配信が実現できなくなります。

これは、問題を示すために思いついた単純な例にすぎないことに注意してください。実際の問題はより複雑になる可能性があり、リアルタイムで配信する必要があります。

この問題を解決する方法についてのアイデアはありますか (Spark だけでなく、他のソリューションもすべて適切です)。

0 投票する
1 に答える
1124 参照

scala - SPARK でレシーバーを実装する

SPARK 0.9 の受信機を実装しようとしています。Jnetpcap ライブラリを使用してパケットをキャプチャしたので、それを Scala の spark に渡す必要があります。「def receive()」メソッドでパケットのキャプチャ部分を記述するだけで十分ですか?

編集: Jnetpcap ライブラリを使用してパケットをキャプチャするこのリンクのコードは次のとおりです。

このコードを使用してキャプチャされたパケットのスパーク レシーバーを実装する方法は?

0 投票する
1 に答える
2862 参照

scala - Spark ストリーミング ステートフル ネットワーク ワード カウント

これは、Spark に付属のサンプル コードです。ここにコードをコピーしました。これがリンクです: https://github.com/apache/spark/blob/master/examples/src/main/scala/org/apache/spark/examples/streaming/StatefulNetworkWordCount.scala . ただし、コマンド「bin/run-example org.apache.spark.examples.streaming.StatefulNetworkWordCount localhost 9999」を使用してプログラムを実行しようとすると、次のエラーが発生しました。

****************コード********************

ファイルが Hadoop と互換性のないファイルであるのに、コマンド「ssc.checkpoint(".")」を実行してローカル ファイル システムにチェックポイントを設定しようとしているからでしょうか? (チェックポイントを設定するには、ファイルが Hadoop と互換性がある必要があります) 互換性がある場合、どうすれば修正できますか? ありがとう!

0 投票する
1 に答える
4377 参照

scala - Spark ストリーミング ウィンドウの操作

以下は、30 秒のウィンドウ サイズと 10 秒のスライド サイズで単語数を取得する簡単なコードです。

ただし、次の行からエラーが発生します。

. 特に、から_ + _。エラーは

誰が問題が何であるか教えてもらえますか? ありがとう!

0 投票する
0 に答える
1212 参照

amazon-ec2 - Spark を使用して、常に更新される S3 バケットのコンテンツをストリーミングする

一定時間ごとにファイルを S3 バケットにエクスポートするアプリがあります。このバケットからストリーミングし、30 秒ごとに新しいファイルの行を配信する Spark Streaming アプリを開発する必要があります。

資格情報について理解するのに役立つこの投稿を読みましたが、それでも私のニーズには対応していません。

Q1. これを行う方法について、誰かがコードやヒントを提供できますか? Twitter の例を見たことがありますが、それを自分のシナリオに適用する方法がわかりませんでした。

Q2. Spark Streaming は、次のファイルを取得する前に最後にストリーミングされたファイルをどのように認識しますか? これは、ファイルの LastModified ヘッダーまたはある種のタイムスタンプに基づいていますか?

Q3. クラスターがダウンした場合、停止した場所からストリーミングを開始するにはどうすればよいですか?

前もって感謝します!!

0 投票する
1 に答える
605 参照

scala - ネットワーク トラフィックをキャプチャするための Spark レシーバ

Spark Streaming アプリケーションを書きたかったのです。Java ライブラリを使用してパケットをキャプチャする scala コードを作成しました。ここで、これらのパケットをスパーク ストリーミング レシーバー プログラム (0.9) に渡したいと思いました。新しい Spark レシーバー コードを作成する必要がありますか?それとも、パケットをキャプチャするために既に作成された scala コードでレシーバーを作成することは可能ですか? どうすればそれができますか?誰でも私を助けてください。

0 投票する
1 に答える
4771 参照

sql - Spark Streaming で SparkSQL を実行しようとしています

Spark でストリーミング データに対して SQL クエリを実行しようとしています。これは非常に簡単に見えますが、試してみると、エラーtable not found : tablename >が表示されます。登録したテーブルが見つかりません。

バッチ データで Spark SQL を使用すると問題なく動作するので、streamingcontext.start() の呼び出し方法に関係していると考えています。問題は何ですか?コードは次のとおりです。

どんな提案でも大歓迎です。ありがとう。

0 投票する
3 に答える
1573 参照

scala - Spark に来る Flume の Avro イベントをデシリアライズする方法は?

Flume Avro シンクと、シンクを読み取る SparkStreaming プログラムがあります。CDH 5.1、Flume 1.5.0、Spark 1.0、Spark のプログラム言語として Scala を使用

Spark の例を作成し、Flume Avro イベントをカウントすることができました。

ただし、Flume Avro イベントを文字列\テキストにシリアル化し、構造行を解析することはできませんでした。

Scalaを使用してこれを行う方法の例はありますか?

0 投票する
2 に答える
12322 参照

apache-spark - SQL over Spark ストリーミング

これは、Spark Streaming で単純な SQL クエリを実行するためのコードです。

ご覧のとおり、ストリーミングで SQL を実行するには、foreachRDD メソッド内でクエリを作成する必要があります。2 つの異なるストリームから受信したデータに対して SQL 結合を実行したいと考えています。できる方法はありますか?