11

私はDenseVector RDDこのようなものを持っています

>>> frequencyDenseVectors.collect()
[DenseVector([1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 1.0, 1.0, 1.0, 0.0, 1.0]), DenseVector([1.0, 1.0, 1.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([1.0, 1.0, 0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0]), DenseVector([0.0, 1.0, 1.0, 0.0, 0.0, 1.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0, 0.0])]

これを に変換したいDataframe。私はこのようにしてみました

>>> spark.createDataFrame(frequencyDenseVectors, ['rawfeatures']).collect()

このようなエラーが発生します

Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 520, in createDataFrame
    rdd, schema = self._createFromRDD(data.map(prepare), schema, samplingRatio)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 360, in _createFromRDD
    struct = self._inferSchema(rdd, samplingRatio)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/session.py", line 340, in _inferSchema
    schema = _infer_schema(first)
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 991, in _infer_schema
    fields = [StructField(k, _infer_type(v), True) for k, v in items]
  File "/opt/BIG-DATA/spark-2.0.0-bin-hadoop2.7/python/pyspark/sql/types.py", line 968, in _infer_type
    raise TypeError("not supported type: %s" % type(obj))
TypeError: not supported type: <type 'numpy.ndarray'>

古いソリューション

frequencyVectors.map(lambda vector: DenseVector(vector.toArray()))

編集 1 - 再現可能なコード

from pyspark import SparkConf, SparkContext
from pyspark.sql import SQLContext, Row
from pyspark.sql.functions import split

from pyspark.ml.feature import CountVectorizer
from pyspark.mllib.clustering import LDA, LDAModel
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.mllib.linalg import SparseVector, DenseVector

sqlContext = SQLContext(sparkContext=spark.sparkContext, sparkSession=spark)
sc.setLogLevel('ERROR')

sentenceData = spark.createDataFrame([
    (0, "Hi I heard about Spark"),
    (0, "I wish Java could use case classes"),
    (1, "Logistic regression models are neat")
], ["label", "sentence"])
sentenceData = sentenceData.withColumn("sentence", split("sentence", "\s+"))
sentenceData.show()

vectorizer = CountVectorizer(inputCol="sentence", outputCol="rawfeatures").fit(sentenceData)
countVectors = vectorizer.transform(sentenceData).select("label", "rawfeatures")

idf = IDF(inputCol="rawfeatures", outputCol="features")
idfModel = idf.fit(countVectors)
tfidf = idfModel.transform(countVectors).select("label", "features")
frequencyDenseVectors = tfidf.rdd.map(lambda vector: [vector[0],DenseVector(vector[1].toArray())])
frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])
4

2 に答える 2

14

RDD[Vector]直接変換することはできません。RDDとして解釈できるオブジェクトの にマップする必要があります。structsたとえば、次のようになりますRDD[Tuple[Vector]]

frequencyDenseVectors.map(lambda x: (x, )).toDF(["rawfeatures"])

そうしないと、Spark はオブジェクトを変換し__dict__、サポートされていない NumPy 配列をフィールドとして使用しようとします。

from pyspark.ml.linalg import DenseVector  
from pyspark.sql.types import _infer_schema

v = DenseVector([1, 2, 3])
_infer_schema(v)
TypeError                                 Traceback (most recent call last)
... 
TypeError: not supported type: <class 'numpy.ndarray'>

対。

_infer_schema((v, ))
StructType(List(StructField(_1,VectorUDT,true)))

:

  • Spark 2.0 では、正しいローカル タイプを使用する必要があります。

    • pyspark.ml.linalgDataFrameベースのpyspark.mlAPIを使用する場合。
    • pyspark.mllib.linalgRDDベースのpyspark.mllibAPIを使用する場合。

    これら 2 つの名前空間は互換性がなくなり、明示的な変換が必要になります (たとえば、 org.apache.spark.mllib.linalg.VectorUDT から ml.linalg.VectorUDT に変換する方法)。

  • 編集で提供されたコードは、元の質問のものと同等ではありません。同じセマンティクスを持っていないことに注意してtupleください。listベクトルをペアにマップして使用tupleし、直接変換する場合DataFrame:

    tfidf.rdd.map(
        lambda row: (row[0], DenseVector(row[1].toArray()))
    ).toDF()
    

    (製品タイプ)を使用tupleすると、ネストされた構造でも機能しますが、これがあなたが望むものであるとは思えません:

    (tfidf.rdd
        .map(lambda row: (row[0], DenseVector(row[1].toArray())))
        .map(lambda x: (x, ))
        .toDF())
    

    list最上位レベル以外の場所rowは として解釈されますArrayType

  • 変換に UDF を使用する方がはるかにクリーンです ( Spark Python: Standard scaler error "Do not support ... SparseVector" )。

于 2016-12-26T11:50:40.390 に答える