問題タブ [apache-spark-2.0]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
java - org.apache.spark.sql.cassandra.CassandraSQLContext をインポートできません
パスに spark-cassandra-connector_2.11-2.0.0-M3.jar がありますが、それでも org.apache.spark.sql.cassandra.CassandraSQLContext をインポートできません。これは他のパッケージに含まれていますか? すべてのドキュメントはこのパッケージを指しています (または非推奨)。
ありがとう
scala - スパークでjsonを解析する
私はスパークジョブでローカルドライブからjsonを解析するためにjsonスカラライブラリを使用していました:
しかし、hdfs ファイルの場所を指定して同じパーサーを使用しようとすると、機能しません。
エラーが表示されます:
Json.parseFull ライブラリを使用して hdfs ファイルの場所からデータを取得するにはどうすればよいですか?
ありがとう
streaming - Zeppelin 6.5 + 構造化ストリーミング 2.0.2 用の Apache Kafka コネクタ
Spark の構造化ストリーミングの例と Kafka コネクタを含む zeppelin ノートブックを実行しようとしています。
ここに私の環境があります:
私のツェッペリンノートブックのコードは次のとおりです。
ノートブックを実行すると、次のエラーが表示されます。
import org.apache.spark.sql.functions.{explode, split} java.lang.ClassNotFoundException: データ ソースが見つかりませんでした: kafka。https://cwiki.apache.org/confluence/display/SPARK/Third+Party+Projectsでパッケージを見つけて ください。 org.apache.spark.sql.execution.datasources.DataSource.lookupDataSource(DataSource.scala:148) で org.apache.spark.sql.execution.datasources.DataSource.providingClass$lzycompute(DataSource.scala:79) で org .apache.spark.sql.execution.datasources.DataSource.providingClass(DataSource.scala:79) org.apache.spark.sql.execution.datasources.DataSource.sourceSchema(DataSource.scala:218) org.apache.spark で.sql.execution.datasources.DataSource.sourceInfo$lzycompute(DataSource.scala:80) at org.apache.spark.sql.execution.datasources.DataSource.sourceInfo(DataSource.scala:80) at org.apache.spark.sql .execution.streaming.StreamingRelation$.apply(StreamingRelation.scala:30) at org.apache.spark.sql.streaming.DataStreamReader.load(DataStreamReader.scala:124) ... 86 省略原因: java.lang.ClassNotFoundException :カフカ。DefaultSource の scala.reflect.internal.util.AbstractFileClassLoader.findClass(AbstractFileClassLoader.scala:62) の java.lang.ClassLoader.loadClass(ClassLoader.java:424) の java.lang.ClassLoader.loadClass(ClassLoader.java:357) org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) で org.apache.spark.sql.execution.datasources.DataSource$$anonfun で$5$$anonfun$apply$1.apply(DataSource.scala:132) で scala.util.Try$.apply(Try.scala:192)132) org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) で scala.util.Try$.apply(Try.scala:192) )132) org.apache.spark.sql.execution.datasources.DataSource$$anonfun$5$$anonfun$apply$1.apply(DataSource.scala:132) で scala.util.Try$.apply(Try.scala:192) )
ヘルプアドバイスをいただければ幸いです。
thnx
apache-spark - DAG をタスクに変換する際の大幅な遅延
これは私のステップです:
- Spark アプリを EMR クラスターに送信する
- ドライバーが起動し、Spark-ui が表示されます (ステージはまだ作成されていません)
- ドライバーは、s3 から約 3000 個のパーツを含む orc ファイルを読み取り、いくつかの変換を行って、それを s3 に保存します。
- 保存を実行すると、spark-ui にいくつかのステージが作成されますが、ステージが spark-ui に表示されるまでに非常に時間がかかります。
- ステージが出現し実行開始
ステップ 4 で大幅な遅延が発生するのはなぜですか? この間、クラスターは明らかに何かを待っており、CPU 使用率は 0% です。
ありがとう
apache-spark - 複数の Kafka トピックを複数の Spark ジョブに並行して実行する方法
Spark を使い始めたばかりで理解しようとしているので、この質問が意味をなさない場合はご容赦ください。
私が読んだことから、Spark はストリーミング データのリアルタイム分析を行うための良いユース ケースであり、hdfs/hive/hbase などのダウンストリーム シンクにプッシュできます。
それについて2つ質問があります。実行中の Spark ストリーミング ジョブが 1 つだけなのか、それとも常に複数なのかは明確ではありません。Kafka からのトピックごと、または Kafka にストリーミングしているソースごとに実行する必要があるさまざまな分析があり、それらの結果を下流にプッシュするとします。
Spark では、複数のストリーミング ジョブを並行して実行できるので、集計分析をストリームごと、この場合は Kafka トピックごとに分けて保持できますか。もしそうなら、それはどのように行われますか、あなたが私に指摘できるドキュメントはありますか?
明確にするために、私のユースケースはさまざまなソースからストリーミングすることであり、各ソースには、実行する必要がある異なる分析と異なるデータ構造が潜在的に含まれる可能性があります。複数の Kafka トピックとパーティションを使用できるようにしたいと考えています。各 Kafka パーティションは Spark パーティションにマップされ、並列化できることを理解しています。
ただし、複数のSparkストリーミングジョブを並行して実行して、複数のKafkaトピックから読み取り、それらのトピック/ストリームに関する個別の分析を集計する方法はわかりません.
Spark でない場合、これは Flink で実行できるものですか?
2 つ目は、Spark をどのように使い始めるかです。Confluent-Kafka、Databricks-Spark、Hadoop-HW/CDH/MAPR など、コンポーネントごとに選択できる会社やディストリビューションがあるようです。これらすべてが本当に必要なのか、それともベンダーの数を制限しながらビッグ データ パイプラインを使用するための最小限で最も簡単な方法は何ですか? POC を開始するだけでも、非常に大きな作業のように思えます。
java - Spark は機能インターフェースをシリアライズできません
Spark が私の機能的インターフェースのシリアル化に失敗する理由を理解するのを手伝ってくれませんSerializablePredicate
か?
例外を引き起こすコード
MyKryoRegistrator.java
SerializablePredicate
java - spark-sql - ネストされたクエリを使用してデータをフィルタリングする
いくつかの列を持つ巨大な .csv ファイルがありますが、私にとって重要な列は .csv ですUSER_ID(User Identifier), DURATION(Duration of Call), TYPE(Incoming or Outgoing), DATE, NUMBER(Mobile No.)
。
だから私がやろうとしているのは、列のすべてのnull
値をに置き換えることです。DURATION
average of duration of all the calls of same type by the same user(i.e. of same USER_ID)
次のように平均を見つけました:
以下のクエリでは、同じユーザーによる同じタイプのすべての通話時間の平均を調べています。
filteredData.show() は次のようになります:
以下のクエリでは、データをフィルタリングしnull
、ステップ 2 ですべての発生を 0 に置き換えています。
filteredData2.show() は次を与えます:
これら2つを組み合わせるか、これら2つを使用して必要な結果を得るのを手伝ってください。Spark と SparkSQL は初めてです。
ありがとう。