4

Scala 2.11 で Spark 2.3.0。ここAggregatorのドキュメントに従ってカスタムを実装しています。アグリゲーターには、入力、バッファー、および出力の 3 つのタイプが必要です。

私のアグリゲーターは、ウィンドウ内の以前のすべての行に作用する必要があるため、次のように宣言しました。

case class Foo(...)

object MyAggregator extends Aggregator[Foo, ListBuffer[Foo], Boolean] {
    // other override methods
    override def bufferEncoder: Encoder[ListBuffer[Mod]] = ???
}

オーバーライド メソッドの 1 つは、バッファ タイプ (この場合はListBuffer. 適切なエンコーダーも、これをエンコードする他の方法も見つからないため、org.apache.spark.sql.Encodersここで何を返すかわかりません。

タイプの単一のプロパティを持つ新しいケースクラスを作成し、ListBuffer[Foo]それをバッファクラスとして使用してから使用することをEncoders.product考えましたが、それが必要かどうか、または何か他に欠けているものがあるかどうかはわかりません。ヒントをありがとう。

4

2 に答える 2