91

新しい spark DataFrame API を見ると、データフレーム列を変更できるかどうかは不明です。

xデータフレームの行列の値を変更するにはどうすればyよいですか?

これpandasは次のようになります。

df.ix[x,y] = new_value

編集:以下の内容を統合すると、既存のデータフレームは不変であるため変更できませんが、必要な変更を加えた新しいデータフレームを返すことができます。

次のような条件に基づいて列の値を置き換えるだけの場合np.where:

from pyspark.sql import functions as F

update_func = (F.when(F.col('update_col') == replace_val, new_value)
                .otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)

列に対して何らかの操作を実行し、データフレームに追加される新しい列を作成する場合:

import pyspark.sql.functions as F
import pyspark.sql.types as T

def my_func(col):
    do stuff to column here
    return transformed_value

# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())

df = df.withColumn('new_column_name', my_udf('update_col'))

新しい列に古い列と同じ名前を付けたい場合は、追加の手順を追加できます。

df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
4

5 に答える 5

77

列自体を変更することはできませんが、列を操作して、その変更を反映した新しい DataFrame を返すことはできます。そのためには、最初UserDefinedFunctionに適用する操作の実装を作成し、次にその関数を対象の列のみに選択的に適用します。Python の場合:

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import StringType

name = 'target_column'
udf = UserDefinedFunction(lambda x: 'new_value', StringType())
new_df = old_df.select(*[udf(column).alias(name) if column == name else column for column in old_df.columns])

new_dfと同じスキーマを持つようになりました(それも型であるとold_df仮定します) が、列のすべての値は になります。old_df.target_columnStringTypetarget_columnnew_value

于 2015-03-25T13:35:02.113 に答える
14

DataFramesRDD に基づいています。RDD は不変の構造であり、オンサイトで要素を更新することはできません。値を変更するには、SQL に似た DSL またはmap.

強く推奨されるスライド デッキ:大規模データ サイエンスのための Spark での DataFrames の紹介

于 2015-03-17T21:51:45.187 に答える
12

maasgが言うように、古いDataFrameにマップを適用した結果から新しい DataFrame を作成できます。2 つの行を持つ特定の DataFrame の例df:

val newDf = sqlContext.createDataFrame(df.map(row => 
  Row(row.getInt(0) + SOMETHING, applySomeDef(row.getAs[Double]("y")), df.schema)

列の型が変更された場合は、代わりに正しいスキーマを指定する必要があることに注意してくださいdf.schemaorg.apache.spark.sql.Row利用可能なメソッドの API を確認してください: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html

[更新] または、Scala で UDF を使用する:

import org.apache.spark.sql.functions._

val toLong = udf[Long, String] (_.toLong)

val modifiedDf = df.withColumn("modifiedColumnName", toLong(df("columnName"))).drop("columnName")

列名を同じままにする必要がある場合は、名前を元に戻すことができます。

modifiedDf.withColumnRenamed("modifiedColumnName", "columnName")
于 2015-11-08T21:19:36.520 に答える
5

pyspark.sql.functionsからcol をインポートし、 5 番目の列を文字列 (文字列 a、文字列 b、文字列 c) に基づいて整数 (0,1,2) に更新して、新しい DataFrame に入れます。

from pyspark.sql.functions import col, when 

data_frame_temp = data_frame.withColumn("col_5",when(col("col_5") == "string a", 0).when(col("col_5") == "string b", 1).otherwise(2))
于 2020-05-26T15:59:15.750 に答える