1

以下のスニペットでは、(Kafka から受け取った) 温度の DStream を pandas Dataframe に変換しようとしています。

def main_process(time, dStream):
print("========= %s =========" % str(time))

try:
    # Get the singleton instance of SparkSession
    spark = getSparkSessionInstance(dStream.context.getConf())

    # Convert RDD[String] to RDD[Row] to DataFrame
    rowRdd = dStream.map(lambda t: Row(Temperatures=t))

    df = spark.createDataFrame(rowRdd)

    df.show()

    print("The mean is: %m" % df.mean())

そのままでは、平均が計算されることはありません。これは、「df」が pandas データフレーム (?) ではないためだと思います。

df = spark.createDataFrame(df.toPandas())関連ドキュメントに従って使用してみましたが、コンパイラは「toPandas()」を認識せず、変換は行われません。

私は正しい道を進んでいますか? もしそうなら、どのように変換を適用すればよいですか?

それとも、私のアプローチが間違っているので、DStream を別の方法で処理する必要がありますか?

前もって感謝します!

4

0 に答える 0