0

私はデータブリックとpysparkの初心者です。現在、3列を含むpysparkデータフレームがあります:

  • 日にち
  • 通貨

金額列を EUR に換算し、その日の為替レートで計算したいと考えています。そのために、為替レート API を使用して、日付と通貨をパラメーターとして取得し、為替レートを調べています。

最初に、為替レートを見つけるために API 呼び出しを行う関数を定義しました。

ここに私のコードがあります:

def API(val1,currency,date):
  r = requests.get('https://api.exchangeratesapi.io/'+date,params={'symbols':currency})
  df = spark.read.json(sc.parallelize([r.json()]))
  df_value = df.select(F.col("rates."+currency))
  value = df_value.collect()[0][2]
  val = val1*(1/value)

  return float(val) 

次に、データフレーム内でこの関数を呼び出す UDF を定義しました。

API_Convert = F.udf(lambda x,y,z : API(x,y,z) if (y!= 'EUR') else x, FloatType())

この部分を実行しようとすると、絶対に理解できない酸洗エラーが発生します...

df = df.withColumn('amount',API_Convert('amount','currency','date'))
PicklingError: Could not serialize object: Exception: It appears that you are attempting to reference SparkContext from a broadcast variable, action, or transformation. SparkContext can only be used on the driver, not in code that it run on workers. For more information, see SPARK-5063.

この問題を解決するのを手伝ってくれませんか?

どうもありがとう

4

0 に答える 0