1

以下のコード では、URL で env 変数を使用してredis-py接続をインスタンス化しようとしています。問題は、foreach または foreachPartitionを使用すると、env 変数が #save_on_redis メソッドで認識されないことです。

外部で redis 接続を作成しようとしましたが、"pickle.PicklingError: Can't pickle 'lock' object"が表示されました。これは、spark がこれら 2 つのメソッドをすべてのノードで同時に実行しようとするためです。

質問: foreach または foreachPartition に引数として渡されるメソッドで環境変数を使用するにはどうすればよいですか?

import os
from pyspark.sql import SparkSession
import redis

spark = (SparkSession
        .builder
        .getOrCreate())

print "---------"
print os.getenv("REDIS_REPORTS_URL")
print "---------"

def save_on_redis(row):
    redis_ = redis.StrictRedis(host=os.getenv("REDIS_REPORTS_URL"), port=6379, db=0)
    print os.getenv("REDIS_REPORTS_URL")
    print redis_
    redis_.set("#teste#", "fagner")


df  = spark.createDataFrame([(0,1), (0,1), (0,2)], ["id", "score"])
df.foreach(save_on_redis)
4

1 に答える 1