私は、リカレントニュートラルネットワークを使用して、特定の映画レビューが肯定的か否定的かを予測する感情分析プログラムを持っています。そのプログラムには Deeplearning4j 深層学習ライブラリを使用しています。次に、そのプログラムを Apache Spark パイプラインに追加する必要があります。
それを行うと、MovieReviewClassifier
拡張するクラスがありorg.apache.spark.ml.classification.ProbabilisticClassifier
、そのクラスのインスタンスをパイプラインに追加する必要があります。モデルを構築するために必要な機能は、setFeaturesCol(String s)
メソッドを使用してプログラムに入力されます。追加する機能はString
、センチメント分析に使用される一連の文字列であるため、フォーマットされています。ただし、機能は の形式である必要がありますorg.apache.spark.mllib.linalg.VectorUDT
。文字列を Vector UDT に変換する方法はありますか?
以下にパイプライン実装用のコードを添付しました。
public class RNNPipeline {
final static String RESPONSE_VARIABLE = "s";
final static String INDEXED_RESPONSE_VARIABLE = "indexedClass";
final static String FEATURES = "features";
final static String PREDICTION = "prediction";
final static String PREDICTION_LABEL = "predictionLabel";
public static void main(String[] args) {
SparkConf sparkConf = new SparkConf();
sparkConf.setAppName("test-client").setMaster("local[2]");
sparkConf.set("spark.driver.allowMultipleContexts", "true");
JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);
SQLContext sqlContext = new SQLContext(javaSparkContext);
// ======================== Import data ====================================
DataFrame dataFrame = sqlContext.read().format("com.databricks.spark.csv")
.option("inferSchema", "true")
.option("header", "true")
.load("/home/RNN3/WordVec/training.csv");
// Split in to train/test data
double [] dataSplitWeights = {0.7,0.3};
DataFrame[] data = dataFrame.randomSplit(dataSplitWeights);
// ======================== Preprocess ===========================
// Encode labels
StringIndexerModel labelIndexer = new StringIndexer().setInputCol(RESPONSE_VARIABLE)
.setOutputCol(INDEXED_RESPONSE_VARIABLE)
.fit(data[0]);
// Convert indexed labels back to original labels (decode labels).
IndexToString labelConverter = new IndexToString().setInputCol(PREDICTION)
.setOutputCol(PREDICTION_LABEL)
.setLabels(labelIndexer.labels());
// ======================== Train ========================
MovieReviewClassifier mrClassifier = new MovieReviewClassifier().setLabelCol(INDEXED_RESPONSE_VARIABLE).setFeaturesCol("Review");
// Fit the pipeline for training..setLabelCol.setLabelCol.setLabelCol.setLabelCol
Pipeline pipeline = new Pipeline().setStages(new PipelineStage[] { labelIndexer, mrClassifier, labelConverter});
PipelineModel pipelineModel = pipeline.fit(data[0]);
}
}
Review は、ポジティブまたはネガティブとして予測される文字列を含む機能列です。
コードを実行すると、次のエラーが発生します。
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: Column Review must be of type org.apache.spark.mllib.linalg.VectorUDT@f71b0bce but was actually StringType.
at scala.Predef$.require(Predef.scala:233)
at org.apache.spark.ml.util.SchemaUtils$.checkColumnType(SchemaUtils.scala:42)
at org.apache.spark.ml.PredictorParams$class.validateAndTransformSchema(Predictor.scala:50)
at org.apache.spark.ml.Predictor.validateAndTransformSchema(Predictor.scala:71)
at org.apache.spark.ml.Predictor.transformSchema(Predictor.scala:116)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:167)
at org.apache.spark.ml.Pipeline$$anonfun$transformSchema$4.apply(Pipeline.scala:167)
at scala.collection.IndexedSeqOptimized$class.foldl(IndexedSeqOptimized.scala:51)
at scala.collection.IndexedSeqOptimized$class.foldLeft(IndexedSeqOptimized.scala:60)
at scala.collection.mutable.ArrayOps$ofRef.foldLeft(ArrayOps.scala:108)
at org.apache.spark.ml.Pipeline.transformSchema(Pipeline.scala:167)
at org.apache.spark.ml.PipelineStage.transformSchema(Pipeline.scala:62)
at org.apache.spark.ml.Pipeline.fit(Pipeline.scala:121)
at RNNPipeline.main(RNNPipeline.java:82)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)