私は何千ものセンサーを持っています。データを分割する必要があります (つまり、1 日あたりのセンサーごと)。次に、データ ポイントの各リストを R アルゴリズムに送信します)。Spark を使用すると、単純化されたサンプルは次のようになります。
//Spark
val rddData = List(
("1:3", List(1,1,456,1,1,2,480,0,1,3,425,0)),
("1:4", List(1,4,437,1,1,5,490,0)),
("1:6", List(1,6,500,0,1,7,515,1,1,8,517,0,1,9,522,0,1,10,525,0)),
("1:11", List(1,11,610,1))
)
case class DataPoint(
key: String,
value: List[Int]) // 4 value pattern, sensorID:seq#, seq#, value, state
寄木細工のファイルに変換して保存します。寄木細工を SparkR にロードします。問題ありません。スキーマには次のように書かれています。
#SparkR
df <- read.df(sqlContext, filespec, "parquet")
schema(df)
StructType
|-name = "key", type = "StringType", nullable = TRUE
|-name = "value", type = "ArrayType(IntegerType,true)", nullable = TRUE
そのため、SparkR には、必要なすべてのデータ (df$value) が各レコードに含まれるデータフレームがあります。その配列をRが消費できるものに抽出し、結果の配列を保持する新しい列で元のデータフレーム(df)を変更したいと考えています。論理的には results = function(df$value) のようなものです。次に、結果 (すべての行) を SparkR データフレームに戻して出力する必要があります。
SparkR データフレームから配列を抽出し、結果を変更するにはどうすればよいですか?