問題タブ [apache-spark-1.3]
For questions regarding programming in ECMAScript (JavaScript/JS) and its various dialects/implementations (excluding ActionScript). Note JavaScript is NOT the same as Java! Please include all relevant tags on your question; e.g., [node.js], [jquery], [json], [reactjs], [angular], [ember.js], [vue.js], [typescript], [svelte], etc.
python-2.7 - スパークストリーミング。Py4j に関する問題: 新しい通信チャネルを取得中にエラーが発生しました
現在、Spark 1.3 と Python 2.7 の 50 ノードのクラスターでリアルタイムの Spark ストリーミング ジョブを実行しています。Spark ストリーミング コンテキストは、180 秒のバッチ間隔で HDFS のディレクトリから読み取ります。以下は、Spark ジョブの構成です。
spark-submit --master yarn-client --executor-cores 5 --num-executors 10 --driver-memory 10g --conf spark.yarn.executor.memoryOverhead=2048 --conf spark.yarn.driver.memoryOverhead= 2048 --conf spark.network.timeout=300 --executor-memory 10g
ほとんどの場合、ジョブは正常に実行されます。ただし、通信チャネルを取得できないことを理由に、約 15 時間後に Py4j Exception をスローします。
バッチ間隔のサイズを小さくしようとしましたが、処理時間がバッチ間隔よりも長くなるという問題が発生します。
以下はエラーのスクリーンショットです
いくつかの調査を行ったところ、ここからのソケット記述子のリークの問題である可能性があることがわかりましたSPARK-12617
ただし、エラーを回避して解決することはできません。ポートの提供を妨げている可能性のある開いている接続を手動で閉じる方法はありますか。または、これを解決するためにコードに特定の変更を加える必要がありますか。
ティア
scala - Scala + Spark 1.3 を使用して Hive テーブルに段階的に追加する
私たちのクラスターには Spark 1.3 と Hive があり、ランダムに選択された行を追加する必要がある大きな Hive テーブルがあります。条件を読み取ってチェックする小さなテーブルがあり、その条件が真の場合は、必要な変数を取得してから、ランダムな行を埋めるためにクエリを実行します。私がしたことは、その条件でクエリを実行し、table.where(value<number)
を使用して配列にしtake(num rows)
ました。次に、これらすべての行には、大規模なハイブ テーブルから必要なランダムな行に関する必要な情報が含まれているため、配列を反復処理します。
クエリで使用するクエリを実行するときORDER BY RAND()
(を使用sqlContext
)。var Hive table
大きなテーブルから列を追加して(変更可能にする)を作成しました。ループでは、unionAll を実行しますnewHiveTable = newHiveTable.unionAll(random_rows)
これを行うためにさまざまな方法を試しましたが、CPU と一時ディスクの使用を避けるための最善の方法がわかりません。データフレームは増分追加を目的としていないことを知っています。私が今試みなければならないことの1つは、cvsファイルを作成し、そのファイルにランダムな行をループでインクリメンタルに書き込み、ループが終了したら、cvsファイルをテーブルとしてロードし、1つのunionAllを実行して最終的な結果を取得することですテーブル。
どんなフィードバックも素晴らしいでしょう。ありがとう
scala - Scala と Spark を使用して個別の Hive パーティションでタスクを並行して実行し、Hive のロードと Hive または Parquet への結果の書き込みを高速化する
この質問は [this one] (行のリストを pyspark の Hive テーブルに保存する) から派生したものです。
編集この投稿の下部にある私の更新の編集を参照してください
Scala と現在は Pyspark の両方を使用して同じタスクを実行しましたが、データフレームを寄木細工または csv に保存したり、データフレームをリストまたは配列型のデータ構造に変換したりするのが非常に遅いという問題があります。以下は、関連する python/pyspark コードと情報です。
上記を Scala で実行しようとしましたが、同様の問題がありました。ハイブ テーブルまたはハイブ テーブルのクエリを簡単にロードできますが、ランダム シャッフルを実行したり、大きなデータフレームを格納したりする必要があると、メモリの問題が発生します。また、列を 2 つ追加できるという課題もありました。
行を追加したい Hive テーブル (hiveTemp) には、5,570,000 ~ 550 万行と 120 列があります。
for ループで繰り返し処理している Hive テーブルには、5000 行と 3 列があります。25 の固有val1
(hiveTemp の列) と、3000 の組み合わせがval1
ありval2
ます。Val2 は、5 つの列の 1 つとその特定のセル値である可能性があります。これは、コードを微調整した場合、行のルックアップを 5000 から 26 に減らすことができることを意味しますが、取得、保存、およびランダム シャッフルを行う必要がある行の数はかなり大きくなり、メモリの問題が発生します (誰かがいない限り)。これに関する提案があります)
テーブルに追加する必要がある合計行数は、約 100,000 です。
最終的な目標は、550 万行の元のテーブルに、ハイブまたは寄木細工のテーブルとして記述された 10 万行以上を追加することです。簡単であれば、後で 5.5 ミル テーブルにマージできる独自のテーブルに 100k 行を書き込んでも問題ありません。
Scala または Python は問題ありませんが、Scala の方がより好ましいです。
これに関するアドバイスと最適なオプションは素晴らしいでしょう。
どうもありがとう!
編集この問題について私が考えた追加の考え:ハッシュパーティショナーを使用して、ハイブテーブルを26のパーティションに分割しました。これは、26 個の異なる値がある列値に基づいています。for ループで実行したい操作は、これらの各パーティションでのみ実行する必要があるように一般化できます。そうは言っても、これを行うための scala コードを記述し、各パーティションでこれらの各ループを個別の実行者が実行できるようにするために、オンラインでどのガイドを参照できますか? これにより、物事がはるかに高速になると思います。
マルチスレッドを使用してこのようなことを行う方法は知っていますが、scala/spark パラダイムでの方法はわかりません。
apache-spark - 完了してコンテキストが閉じられた後、spark ジョブのログを表示するにはどうすればよいですか?
pyspark
、spark 1.3
、standalone mode
、を実行していclient mode
ます。
過去のジョブを見て比較することで、自分のスパーク ジョブを調査しようとしています。それらのログ、ジョブが送信された構成設定などを表示したいのですが、コンテキストが閉じられた後にジョブのログを表示する際に問題が発生しています。
もちろん、ジョブを送信するときは、spark コンテキストを開きます。ジョブの実行中に、ssh トンネリングを使用して Spark Web UIを開くことができます。そして、転送されたポートに でアクセスできますlocalhost:<port no>
。次に、現在実行中のジョブと完了したジョブを次のように表示できます。
次に、特定のジョブのログを表示したい場合は、ssh トンネル ポート フォワーディングを使用して、そのジョブの特定のマシンの特定のポートのログを表示します。
その後、ジョブが失敗することがありますが、コンテキストはまだ開いています。このような場合でも、上記の方法でログを表示できます。
ただし、これらすべてのコンテキストを一度に開きたくないので、ジョブが失敗したときにコンテキストを閉じます。コンテキストを閉じると、上の画像の「完了したアプリケーション」の下にジョブが表示されます。ここで、ssh トンネル ポート フォワーディングを使用してログを表示しようとすると、以前のように ( localhost:<port no>
) page not found
、.
コンテキストが閉じられた後にジョブのログを表示するにはどうすればよいですか? spark context
そして、これはと ログが保存される場所との関係について何を意味するのでしょうか? ありがとうございました。
繰り返しますが、私は、、、、を実行していpyspark
ます。spark 1.3
standalone mode
client mode