2

map を呼び出すかmapPartition、関数が PySpark から行を受け取る場合、ローカルの PySpark または Pandas DataFrame を作成する自然な方法は何ですか? 行を結合してスキーマを保持するものはありますか?

現在、私は次のようなことをしています:

def combine(partition):
    rows = [x for x in partition]
    dfpart = pd.DataFrame(rows,columns=rows[0].keys())
    pandafunc(dfpart)

mydf.mapPartition(combine)
4

4 に答える 4

2

スパーク >= 2.3.0

Spark 2.3.0 以降、PandasSeriesまたはDataFrameパーティションまたはグループを使用できます。例を参照してください。

スパーク < 2.3.0

ローカル PySpark を作成する自然な方法は何ですか?

そのようなことはない。Spark 分散データ構造をネストできないか、アクションまたは変換をネストできない別のパースペクティブを好む。

またはパンダデータフレーム

比較的簡単ですが、少なくともいくつかのことを覚えておく必要があります。

  • Pandas と Spark DataFrames は、リモートでさえ同等ではありません。これらは異なる構造であり、異なるプロパティを持ち、一般に、それらを別のものに置き換えることはできません。
  • パーティションは空にすることができます。
  • 辞書を渡しているようです。ベースの Python 辞書は順序付けされていないことに注意してください (collections.OrderedDict例とは異なります)。そのため、列を渡すと期待どおりに機能しない場合があります。
import pandas as pd

rdd = sc.parallelize([
    {"x": 1, "y": -1}, 
    {"x": -3, "y": 0},
    {"x": -0, "y": 4}
])

def combine(iter):
    rows = list(iter)
    return [pd.DataFrame(rows)] if rows else []

rdd.mapPartitions(combine).first()
##    x  y
## 0  1 -1
于 2015-12-23T23:11:01.190 に答える
1

toPandas()、_

pandasdf = mydf.toPandas()
于 2015-12-23T16:24:42.207 に答える
0

Spark SQL データフレームを作成するには、ハイブ コンテキストが必要です。

hc = HiveContext(sparkContext)

HiveContext を使用すると、inferSchema 関数を介して SQL データフレームを作成できます。

sparkSQLdataframe = hc.inferSchema(rows)  
于 2015-12-23T15:40:43.907 に答える