6

pyspark (Apache Spark) の DataFrame API を使用していますが、次の問題が発生しています。

同じソース DataFrame に由来する 2 つの DataFrame を結合すると、結果の DF は膨大な数の行に爆発します。簡単な例:

nディスクから行を含む DataFrame をロードします。

df = sql_context.parquetFile('data.parquet')

次に、そのソースから 2 つの DataFrame を作成します。

df_one = df.select('col1', 'col2')
df_two = df.select('col1', 'col3')

最後に、(内部で) それらを元に戻したい:

df_joined = df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner')

キーインcol1はユニークです。n結果の DataFrame には行があるはずですが、行がありn*nます。

ディスクから直接ロードするdf_oneと、それは起こりません。df_two私は Spark 1.3.0 を使用していますが、これは現在の 1.4.0 スナップショットでも発生します。

なぜそれが起こるのか誰か説明できますか?

4

2 に答える 2

4

これを正しく読んでいる場合、df_two には col2 がありません

    df_one = df.select('col1', 'col2')
    df_two = df.select('col1', 'col3')

だからあなたがするとき:

    df_one.join(df_two, df_one['col1'] == df_two['col2'], 'inner')

それは失敗するはずです。言いたかったら

    df_one.join(df_two, df_one['col1'] == df_two['col1'], 'inner')

ただし、同じデータ フレームから読み込んでいることによる影響はありません。私はあなたがすることをお勧めします:

    df_one.show()
    df_two.show()

選択したデータが期待どおりであることを確認するため。

于 2016-07-15T19:57:50.013 に答える
1

Spark 1.3 の大規模なデータセットでもこの問題が発生しています。残念ながら、私が作成した小さな不自然な例では、「結合」は正しく機能します。おそらく、結合に先立つステップからいくつかの根本的なバグがあるように感じます

結合の実行 (注: DateTime は単なる文字列です):

> join = df1.join(df2, df1.DateTime == df2.DateTime, "inner")
> join.count()

250000L

これは明らかに完全な 500*500 デカルト結合を返しています。

私にとってうまくいくのは、SQLに切り替えることです:

  > sqlc.registerDataFrameAsTable(df1, "df1")
  > sqlc.registerDataFrameAsTable(df2, "df2")
  > join = sqlc.sql("select * from df1, df2 where df1.DateTime = df2.DateTime")
  > join.count()
  471L

その値は正しく見えます。

これを見て、この違いをよりよく理解できるようになるまで、私は個人的に pyspark の DataFrame.join() を使用しません。

于 2015-09-25T16:24:55.160 に答える