1

この質問は [this one] (行のリストを pyspark の Hive テーブルに保存する) から派生したものです。

編集この投稿の下部にある私の更新の編集を参照してください

Scala と現在は Pyspark の両方を使用して同じタスクを実行しましたが、データフレームを寄木細工または csv に保存したり、データフレームをリストまたは配列型のデータ構造に変換したりするのが非常に遅いという問題があります。以下は、関連する python/pyspark コードと情報です。

#Table is a List of Rows from small Hive table I loaded using
#query = "SELECT * FROM Table"
#Table = sqlContext.sql(query).collect()

for i in range(len(Table)):

    rows = sqlContext.sql(qry)
    val1 = Table[i][0]
    val2 = Table[i][1]
    count = Table[i][2]
    x = 100 - count

#hivetemp is a table that I copied from Hive to my hfs using:
#create external table IF NOT EXISTS hive temp LIKE hivetableIwant2copy LOCATION "/user/name/hiveBackup";
#INSERT OVERWRITE TABLE hivetemp SELECT * FROM hivetableIwant2copy;

    query = "SELECT * FROM hivetemp WHERE col1<>\""+val1+"\" AND col2 ==\""+val2+"\" ORDER BY RAND() LIMIT "+str(x)

    rows = sqlContext.sql(query)
    rows = rows.withColumn("col4", lit(10))
    rows = rows.withColumn("col5", lit(some_string))
#writing to parquet is heck slow AND I can't work with pandas due to the library not installed on the server
    rows.saveAsParquetFile("rows"+str(i)+".parquet")
#tried this before and heck slow also
    #rows_list = rows.collect()
    #shuffle(rows_list)

上記を Scala で実行しようとしましたが、同様の問題がありました。ハイブ テーブルまたはハイブ テーブルのクエリを簡単にロードできますが、ランダム シャッフルを実行したり、大きなデータフレームを格納したりする必要があると、メモリの問題が発生します。また、列を 2 つ追加できるという課題もありました。

行を追加したい Hive テーブル (hiveTemp) には、5,570,000 ~ 550 万行と 120 列があります。

for ループで繰り返し処理している Hive テーブルには、5000 行と 3 列があります。25 の固有val1(hiveTemp の列) と、3000 の組み合わせがval1ありval2ます。Val2 は、5 つの列の 1 つとその特定のセル値である可能性があります。これは、コードを微調整した場合、行のルックアップを 5000 から 26 に減らすことができることを意味しますが、取得、保存、およびランダム シャッフルを行う必要がある行の数はかなり大きくなり、メモリの問題が発生します (誰かがいない限り)。これに関する提案があります)

テーブルに追加する必要がある合計行数は、約 100,000 です。

最終的な目標は、550 万行の元のテーブルに、ハイブまたは寄木細工のテーブルとして記述された 10 万行以上を追加することです。簡単であれば、後で 5.5 ミル テーブルにマージできる独自のテーブルに 100k 行を書き込んでも問題ありません。

Scala または Python は問題ありませんが、Scala の方がより好ましいです。

これに関するアドバイスと最適なオプションは素晴らしいでしょう。

どうもありがとう!

編集この問題について私が考えた追加の考え:ハッシュパーティショナーを使用して、ハイブテーブルを26のパーティションに分割しました。これは、26 個の異なる値がある列値に基づいています。for ループで実行したい操作は、これらの各パーティションでのみ実行する必要があるように一般化できます。そうは言っても、これを行うための scala コードを記述し、各パーティションでこれらの各ループを個別の実行者が実行できるようにするために、オンラインでどのガイドを参照できますか? これにより、物事がはるかに高速になると思います。
マルチスレッドを使用してこのようなことを行う方法は知っていますが、scala/spark パラダイムでの方法はわかりません。

4

0 に答える 0