問題タブ [pyspark]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
apache-spark - PySpark には、Word2VecModel でアクセス可能な getVectors メソッドがありません
getVectors()
pyspark 1.2.0 Spark バージョンのメソッドにアクセスしようとしていますが、pyspark の状態 -
Scala/Javaを使用してこれにアクセスする方法しかないのでしょうか、それとも何かできることがありますか。
apache-spark - pyspark によって起動される jvm のオプションを指定する
指定された接続先のjvmを起動するときに、pysparkスクリプトによって使用されるjvmオプションはどのように/どこにありますか?
jvm デバッグ オプションの指定に特に関心があります。
ありがとう。
python - Spark データフレームの udf() 用の Python パッケージの関数
pyspark を介した Spark データフレームの場合pyspark.sql.functions.udf
、user defined function (UDF)
.
numpyudf()
などのPython パッケージの関数を使用できるのだろうか?np.random.normal
java - Java のインストールに関する問題 (Spark の使用が困難になる)
Spark をローカルで実行しようとしています。起動すると、次のエラーが表示されます。これは、Java インストール (またはそのパス) に何か問題があることを示唆しています。これは Mac OS Yosemite 上にあります。
これを修正する方法はありますか?さらに情報が必要な場合はお知らせください(どこから始めればよいかわかりません)
編集:明らかにどこかに設定が間違っています。どこにあるのかわかりません。私のbashプロファイルには、次の行があります
また、 java -version を実行すると、次のようになります。
python - PythonシェルでPySparkをインポートできません
$SPARK_HOME/bin/pyspark ファイルに以下を追加してみました:
これは正しい方法ですか?輸入できないからです。何か不足していますか?
amazon-s3 - Spark : 多数のファイルに対して Spark を使用したデータ処理で SocketException : Read timed out と表示される
これらの構成を持つ2台のマシンでSparkをスタンドアロンモードで実行しています
- 500GB メモリ、4 コア、7.5 RAM
- 250GB メモリ、8 コア、15 RAM
8 コア マシンでマスターとスレーブを作成し、ワーカーに 7 コアを与えました。3 つのワーカー コアを備えた 4 コア マシンで別のスレーブを作成しました。UI には、8 コアと 4 コアでそれぞれ 13.7 G と 6.5 G の使用可能な RAM が表示されます。
ここで、15 日間にわたるユーザー評価の集計を処理する必要があります。Pyspark を使用してこれを実行しようとしています。このデータは、s3 バケットの日単位のディレクトリにある時間単位のファイルに保存されます。たとえば、すべてのファイルは約 100MB である必要があります。
s3://some_bucket/2015-04/2015-04-09/data_files_hour1
私はこのようなファイルを読んでいます
files は、「s3://some_bucket/2015-04/2015-04-09/*,s3://some_bucket/2015-04/2015-04-09/*」という形式の文字列です。
次に、一連のマップとフィルターを実行し、結果を保持します
次に、reduceByKey を実行して、数日間の集計スコアを取得する必要があります。
次に、ユーザーが評価したアイテムの実際の用語を redis 呼び出しする必要があるため、このように mapPartitions を呼び出します。
get_tags関数は、呼び出しのたびに redis 接続を作成し、redis を呼び出して(ユーザー、アイテム、レート)タプルを生成します (redis ハッシュは 4core に保存されます)。
SparkConf の設定を微調整しました
ここではクラスター モードがサポートされていないようなので、クライアント モードで 2g のドライバー メモリを使用してジョブを実行します。上記のプロセスは、2 日間のデータ (約 2.5 時間) に長い時間がかかり、14 日間で完全に断念されます。
ここで何を改善する必要がありますか?
- このインフラストラクチャは、RAM とコアの点で不十分ですか (これはオフラインであり、数時間かかる場合がありますが、5 時間程度で終了する必要があります)
- パーティションの数を増減する必要がありますか?
- Redis はシステムを遅くしている可能性がありますが、キーの数が多すぎて 1 回の呼び出しを行うことができません。
- ファイルの読み取り中または縮小中のどこでタスクが失敗しているのかわかりません。
- Scala でより優れた Spark API を使用する場合、Python を使用しない方がよいでしょうか? それによって効率も向上しますか?
これは例外トレースです
事前に感謝します。
ここに私のメインコードがどのように見えるかがあります
def main(sc):
f=get_files()
a=sc.textFile(f, 15)
.coalesce(7*sc.defaultParallelism)
.map(lambda line: line.split(","))
.filter(len(line)>0)
.map(lambda line: (line[18], line[2], line[13], line[15])).map(scoring)
.map(lambda line: ((line[0], line[1]), line[2])).persist(StorageLevel.MEMORY_ONLY_SER)
b=a.reduceByKey(lambda x, y: x+y).map(aggregate)
b.persist(StorageLevel.MEMORY_ONLY_SER)
c=taggings.mapPartitions(get_tags)
c.saveAsTextFile("f")
a.unpersist()
b.unpersist()
get_tags 関数は
get_files 関数は次のとおりです。
get_path_from_dates(DAYS) は
python - グループへの Spark 適用関数
ASV (chr(1) で区切られたハイブ データファイル) 形式のテーブルがあります。そして、特定の列を抽出し、2 つの列の組み合わせでグループ化し、各グループ内で何かをしたいと考えています。
出力を次のようにしたい
これまでにpysparkで行ったこと:
エラーは次のようになります。
しかし、以下のようなエラー メッセージが表示されました。これが構文エラーなのか、システム セットアップ エラーなのかわかりません。
追加情報:
Redhat ボックスの上で CDH を実行しています。ご存知のように、Redhat はデフォルトの Python バージョンとして Python2.6 を使用します。iPythonnotebook を使用するために、namenode で Python2.6 と互換性のある iPython の古いバージョンを作成し、virtualenv を使用して iPythonnotebook を起動しました... (このソーセージの作り方の詳細については、ここをクリックしてください) 。