2

これを試すと:

cfg = SparkConf().setAppName('MyApp')
spark = SparkSession.builder.config(conf=cfg).getOrCreate()

lines = spark.readStream.load(format='socket', host='localhost', port=9999,
                              schema=StructType(StructField('value', StringType, True)))
words = lines.groupBy('value').count()
query = words.writeStream.format('console').outputMode("complete").start()

query.awaitTermination()

次に、エラーが発生します:

AssertionError: dataType は DataType である必要があります

そして、./pyspark/sql/types.py の 403 行目でソース コードを検索します。

assert isinstance(dataType, DataType), "dataType should be DataType"

ただし、DataTypeではなくAtomicTypeに基づくStringType

class StringType(AtomicType):
    """String data type.
    """

    __metaclass__ = DataTypeSingleton

それで、間違いはありますか?

4

1 に答える 1

3

Python ではDataTypesシングルトンとして使用されません。作成するときStructFieldは、インスタンスを使用する必要があります。StructType次のシーケンスも必要StructFieldです。

StructType([StructField('value', StringType(), True)])

それにもかかわらず、これはここではまったく無意味です。のスキーマは固定されており、スキーマ引数TextSocketSource で変更できません。

于 2017-01-06T08:57:15.113 に答える