62

私は使用する必要があります

(rdd.)partitionBy(npartitions, custom_partitioner)

DataFrame では使用できないメソッド。すべての DataFrame メソッドは、DataFrame の結果のみを参照します。では、DataFrame データから RDD を作成する方法は?

注: これは 1.2.0 からの (1.3.0 での) 変更です。

@dpangmaoからの回答からの更新: メソッドは .rdd です。(a) 公開されているか、(b) パフォーマンスにどのような影響があるかを理解したいと思っていました。

(a) は「はい」であり、(b) - ここでは、パフォーマンスに重大な影響があることがわかります。 mapPartitionsを呼び出して、新しい RDD を作成する必要があります。

dataframe.py で(ファイル名も変更されていることに注意してください (以前は sql.py でした):

@property
def rdd(self):
    """
    Return the content of the :class:`DataFrame` as an :class:`RDD`
    of :class:`Row` s.
    """
    if not hasattr(self, '_lazy_rdd'):
        jrdd = self._jdf.javaToPython()
        rdd = RDD(jrdd, self.sql_ctx._sc, BatchedSerializer(PickleSerializer()))
        schema = self.schema

        def applySchema(it):
            cls = _create_cls(schema)
            return itertools.imap(cls, it)

        self._lazy_rdd = rdd.mapPartitions(applySchema)

    return self._lazy_rdd
4

3 に答える 3

102

.rdd次のような方法を使用します。

rdd = df.rdd
于 2015-03-18T17:36:16.587 に答える
83

@dapangmaoの答えは機能しますが、通常のspark RDDを提供せず、Rowオブジェクトを返します。通常のRDD形式が必要な場合。

これを試して:

rdd = df.rdd.map(tuple)

また

rdd = df.rdd.map(list)
于 2016-05-17T21:13:31.803 に答える