11

カスタムステージでパイプラインを作成して保存しようとしていました。を使用して、columnを myに追加する必要があります。したがって、または同様のアクションを?に変換できるかどうか疑問に思っていました。DataFrameUDFUDFTransformer

私のカスタムは次のようになります。カスタムとしてUDFを使用してそれを行う方法を学びたいと思います。UDFTransformer

def getFeatures(n: String) = {
    val NUMBER_FEATURES = 4  
    val name = n.split(" +")(0).toLowerCase
    ((1 to NUMBER_FEATURES)
         .filter(size => size <= name.length)
         .map(size => name.substring(name.length - size)))
} 

val tokenizeUDF = sqlContext.udf.register("tokenize", (name: String) => getFeatures(name))
4

3 に答える 3

15

これは完全な機能を備えたソリューションではありませんが、次のようなものから始めることができます:

import org.apache.spark.ml.{UnaryTransformer}
import org.apache.spark.ml.util.Identifiable
import org.apache.spark.sql.types.{ArrayType, DataType, StringType}

class NGramTokenizer(override val uid: String)
  extends UnaryTransformer[String, Seq[String], NGramTokenizer]  {

  def this() = this(Identifiable.randomUID("ngramtokenizer"))

  override protected def createTransformFunc: String => Seq[String] = {
    getFeatures _
  }

  override protected def validateInputType(inputType: DataType): Unit = {
    require(inputType == StringType)
  }

  override protected def outputDataType: DataType = {
    new ArrayType(StringType, true)
  }
}

クイックチェック:

val df = Seq((1L, "abcdef"), (2L, "foobar")).toDF("k", "v")
val transformer = new NGramTokenizer().setInputCol("v").setOutputCol("vs")

transformer.transform(df).show
// +---+------+------------------+
// |  k|     v|                vs|
// +---+------+------------------+
// |  1|abcdef|[f, ef, def, cdef]|
// |  2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+

次のように一般化することもできます。

import org.apache.spark.sql.catalyst.ScalaReflection.schemaFor
import scala.reflect.runtime.universe._

class UnaryUDFTransformer[T : TypeTag, U : TypeTag](
  override val uid: String,
  f: T => U
) extends UnaryTransformer[T, U, UnaryUDFTransformer[T, U]]  {

  override protected def createTransformFunc: T => U = f

  override protected def validateInputType(inputType: DataType): Unit = 
    require(inputType == schemaFor[T].dataType)

  override protected def outputDataType: DataType = schemaFor[U].dataType
}

val transformer = new UnaryUDFTransformer("featurize", getFeatures)
  .setInputCol("v")
  .setOutputCol("vs")

ラップされた関数ではなく UDF を使用する場合は、Transformer直接拡張してメソッドをオーバーライドする必要がありますtransform。残念ながら、有用なクラスの大部分はプライベートであるため、かなり扱いにくい場合があります。

または、UDF を登録することもできます。

spark.udf.register("getFeatures", getFeatures _)

と使用SQLTransformer

import org.apache.spark.ml.feature.SQLTransformer

val transformer = new SQLTransformer()
  .setStatement("SELECT *, getFeatures(v) AS vs FROM __THIS__")

transformer.transform(df).show
// +---+------+------------------+
// |  k|     v|                vs|
// +---+------+------------------+
// |  1|abcdef|[f, ef, def, cdef]|
// |  2|foobar|[r, ar, bar, obar]|
// +---+------+------------------+
于 2016-02-03T17:20:31.473 に答える