問題タブ [apache-spark-1.6]

For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.

0 投票する
1 に答える
900 参照

apache-spark - Spark Streaming コードを単体テストする方法は?

最新の Spark 1.6.0 を使用しています。

別のstackoverflowの投稿を見た 単体テストでSpark Streamingにファイル内の単語をカウントさせるにはどうすればよいですか?

サンプル @ https://gist.github.com/emres/67b4eae86fa92df69f61を使用し て、spark のサンプル単体テストを作成しようとしています。その後、実際の spark アプリケーションの単体テストを作成するつもりです。ただし、例 @ https://gist.github.com/emres/67b4eae86fa92df69f61を使用できません。これは私にエラーを与えます

java.lang.IllegalStateException: コンテキストの開始後の新しい入力、変換および出力操作の追加は、oracle.security.ti.reportgenerator.test.StarterAppTest.testCountWords(StarterAppTest.java:62)ではサポートされていません

この問題を解決する方法はありますか?

0 投票する
2 に答える
2203 参照

apache-spark - Cassandra からデータを読み取り中にパーティションの数を制御するには?

私が使う:

  1. カサンドラ 2.1.12 - 3 ノード
  2. スパーク 1.6 - 3 ノード
  3. スパーク カサンドラ コネクタ 1.6

Cassandra でトークンを使用します (vnode ではありません)。

Cassandra テーブルからデータを読み取る簡単なジョブを書いています。そのカウント テーブルを表示すると、約 7,000 万行あり、15 分かかります。

データを読み取っていて、RDD のパーティションの数を確認しているときに、大きすぎる 21000 前後のどこかにあります。この数を制御するには?

を試しましsplitCountsplit.size.in.mbsが、同じ数のパーティションが表示されます。

助言がありますか?

}

これは参照用の私のコードです。nodetool compact を実行して、パーティションの数を制御できるようになりましたが、それでもプロセス全体に 6 分近くかかっています。これは高すぎると思います。

0 投票する
1 に答える
768 参照

apache-kafka - Spark Streaming アプリケーションが KafkaException: String exceeded maximum size または IllegalArgumentException で失敗する

TL;DR:

私の非常に単純な Spark Streaming アプリケーションは、ドライバーで「KafkaException: 文字列が最大サイズを超えています」で失敗します。エグゼキューターにも同じ例外が表示されますが、エグゼキューターのログのどこかで、他の情報が含まれていない IllegalArgumentException も見つかりました

完全な問題:

Spark Streaming を使用して、Kafka トピックからいくつかのメッセージを読み取ります。これは私がやっていることです:

私が Kafka データで行っているのは、次を使用して印刷することだけです。

私のアプリケーションには明らかにこれよりも多くのコードがありますが、問題を特定するために、コードから可能な限りすべてを取り除きました

このコードを YARN で実行しようとしています。これは私のスパーク送信行です:

streamconfig.properties ファイルは単なる通常のプロパティ ファイルであり、ここでの問題とはおそらく無関係です。

アプリケーションを実行しようとすると、ドライバーで次の例外が発生してすぐに失敗します。

自分のコードがスタック トレースに表示されない

エグゼキューターを調べると、ドライバーと同じ例外が見つかりましたが、深く埋もれているのは次の例外です。

情報が含まれていないため、IllegalArgument が何であるかわかりません

私のYARNが使用しているSparkのバージョンは1.6.0です。また、pom に以前のバージョンではなく Spark 1.6.0 が含まれていることも確認しました。私のスコープは「提供」されています

まったく同じトピックからデータを手動で読み取りましたが、そこにあるデータは単なる JSON です。そこにあるデータは決して巨大ではありません。32767 よりも確実に小さいです。また、通常のコマンド ライン コンシューマを使用してこのデータを読み取ることができるので、これは奇妙です

残念ながら、この例外をグーグルで検索しても、有用な情報は得られませんでした

ここで何が問題なのかを正確に理解する方法について誰か考えがありますか?

前もって感謝します

0 投票する
2 に答える
19163 参照

apache-spark - group by で最初の null 以外の値を取得する (Spark 1.6)

group by から最初の null 以外の値を取得するにはどうすればよいですか? 合体で最初に使用しようとしましたが、目的の動作が得られません (最初の行を取得したようです)。 F.first(F.coalesce("code"))

私は試した:

望ましい出力

0 投票する
0 に答える
46 参照

apache-spark - reduce() で 2x4CPU が使用可能な場合、Spark は 1CPU のみを使用します。

私は3台のマシンを持っています.4x CPU、8G RAMを備えた1xマスター; 4x CPU と 16G RAM を備えた 2x エグゼキューター。

マスターはスタンドアロン モード(YARN なし) で、pyspark を使用しています。

それが巨大なインフラストラクチャでなくても、ある程度のパフォーマンスが期待できます。操作の実行reduce時:

where tfsenthastfおよびsentimentwhich are SparseVector、およびspvecaddは追加する自家製関数ですSparseVector

これを行うと、3x 4CPU で、executor の 1 つだけが 100% 実行されます。他は0%、メモリは5G/16G前後。理解できません: * なぜこれほど長いのですか * なぜ 1x CPU しか動作していないのですか?

データを自分でパーティション分割する必要がありますか? (つまり、両方のエグゼキューターに明示的にデータを配布するということですか? 私の考えでは、それが Spark の仕事だとしても)。

ヘルプ、アイデア、ヒントをありがとう

追加情報

  • 両方のエグゼキュータがマスターに接続され、タスクに「割り当て」られています (spark Web UI を使用して確認できます)。

  • 私は約380k回線を持っています。両方のベクトル次元は 100 未満です (これは多くありません)。

  • 複雑さは、行数よりも次元に大きく依存する場合があります。

アップデート

repartition(8)RDDを分散させるために使用する必要があることがわかりました。これで問題は解決しましたが、完全に私の質問ではありませんでした:なぜこれをしなければならないのですか?

データの取り方のせいだと思います。私はデータベースから読んでいます。

これは、配布せずに保存すると思います。

0 投票する
2 に答える
5965 参照

apache-spark - 実行中の Spark ジョブが UI に表示されない

パラメータまたはパラメータbin/spark-submit --class DataSet BasicSparkJob-assembly-1.0.jarについて言及せずに、ここで述べたようにスパークジョブを送信しました。そのジョブの代わりに、私の 3 ノード Spark クラスターに送信されます。しかし、ジョブに情報が表示されていないため、どこにジョブを送信したのか疑問に思っていました--masterspark.masterRunning Applications

0 投票する
1 に答える
2129 参照

scala - Scalaを使用してSparkによって推論されたRDDタイプがどれであるかを知る方法

私は次の例を試していました

次に、シェルで次のように取得します

しかし、何らかの理由で、私はまだこの文を実行できたことを理解していません

これをシェルで取得する

だから私はいくつかの質問があります:

1.- rdd という名前の var の実際の RDD タイプは何ですか? シェルでは org.apache.spark.rdd.RDD[(String, Int)] 型であることが示されていますが、API を見ると、RDD クラスには aggregateByKey メソッドがありません。ちなみにJavaPairRDDクラスにはaggregateByKeyメソッドがあります

2.- RDDの実際のタイプを確認/知るにはどうすればよいですか

3.- ParallelCollectionRDD が表示されたのは何ですか? 私はgithubでそれを探しましたが、プライベートクラスであることがわかったので、それがscala APIに表示されない理由だと思いますが、それは何のためですか?

Spark 1.6.2を使用していました