Scala 2.11 で Spark 2.3.0。ここAggregator
のドキュメントに従ってカスタムを実装しています。アグリゲーターには、入力、バッファー、および出力の 3 つのタイプが必要です。
私のアグリゲーターは、ウィンドウ内の以前のすべての行に作用する必要があるため、次のように宣言しました。
case class Foo(...)
object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
// other override methods
override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}
オーバーライド メソッドの 1 つは、バッファ タイプ (この場合はListBuffer
. 適切なエンコーダーも、これをエンコードする他の方法も見つからないため、org.apache.spark.sql.Encoders
ここで何を返すかわかりません。
タイプの単一のプロパティを持つ新しいケースクラスを作成し、ListBuffer[Foo]
それをバッファクラスとして使用してから使用することをEncoders.product
考えましたが、それが必要かどうか、または何か他に欠けているものがあるかどうかはわかりません。ヒントをありがとう。