1

PMML モデルをインポートして、Spark を使用してスコアを計算したいと考えています。Spark を使用しない場合はすべて正常に動作しますが、マッパーでメソッドを使用することはできません。

問題は、シリアル化できないと思われる org.jpmml.evaluator.Evaluator からの評価オブジェクトが必要なことです。そこで、次のクラスを使用して、Serialiazable にしようとしました。

package util;

import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
import java.io.Serializable;

import org.jpmml.evaluator.Evaluator;

public class SerializableEvaluator implements Serializable {

    private static final long serialVersionUID = 6631604036553063657L;
    private Evaluator evaluator;

    public SerializableEvaluator(Evaluator evaluator) {
        this.evaluator = evaluator;
    }

    public Evaluator getEvaluator() {
        return evaluator;
    }

    private void writeObject(ObjectOutputStream out) throws IOException {
        out.writeObject(evaluator);
    }

    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
        Evaluator eval = (Evaluator) in.readObject();
    }
}

また、すべてのクラスをシリアライズ可能にしました。

ここに私のコードのサンプルがあります:

        logger.info("Print 5 first rows----------------------------");
        strTitanicRDD
                .take(5)
                .forEach(row -> logger.info(row));
        logger.info("Print 5 first Titatnic Obs---------------------");
        strTitanicRDD
                .map(row -> new TitanicObservation(row))
                .take(5)
                .forEach(titanic -> logger.info(titanic.toString()));
        logger.info("Print 5 first Scored Titatnic Obs---------------");

        try{strTitanicRDD
            .map(row -> new TitanicObservation(row))
            .map(
                new Function<TitanicObservation,String>(){

                    private static final long serialVersionUID = -2968122030659306400L;

                    @Override
                    public String call(TitanicObservation titanic) throws Exception {
                        String res = PmmlUtil.computeScoreTitanic(evaluator, titanic);
                        return res;
                    }

                })
        .take(5)
        .forEach(row -> logger.info(row));

しかし、私のコードが私の問題を解決するのに役立つとは思いません。これは非常に明確です(ログを参照してください:)

org.apache.spark.SparkException: org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) でタスクをシリアル化できません org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) ) org.apache.spark.SparkContext.clean(SparkContext.scala:1623) で org.apache.spark.rdd.RDD.map(RDD.scala:286) で org.apache.spark.api.java.JavaRDDLike$ org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:46) の class.map(JavaRDDLike.scala:89)、score.acv.AppWithSpark.main(AppWithSpark.java:117) の sun.reflect .NativeMethodAccessorImpl.invoke0(ネイティブ メソッド) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) java.lang.reflect.Method.invoke(Method.java:497) で org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:577) でorg.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:174) で org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:197) で org.apache.spark.deploy. SparkSubmit$.main(SparkSubmit.scala:112) at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)197) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) で org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) で197) org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:112) で org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) で

原因: java.io.NotSerializableException: org.xml.sax.helpers.LocatorImpl シリアル化スタック:

    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:38)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:80)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 15 more
4

1 に答える 1

2

インターフェイスの背後には、サブクラスorg.jpmml.evaluator.Evaluatorのインスタンスがあります。org.jpmml.evaluator.ModelEvaluatorクラスModelEvaluatorとそのすべてのサブクラスは、設計上シリアライズ可能です。この問題は、最初にメソッドにorg.dmg.pmml.PMML提供したオブジェクト インスタンスに関係しています。ModelEvaluatorFactory#newModelManager(PMML)

簡単に言うと、すべての PMML クラス モデル オブジェクトに SAX ロケータ情報を付加できます。これは、問題のある XML コンテンツを見つけるための開発およびテスト段階で役立ちます。ただし、本番段階では、この情報を保持する必要はありません。JAXB ランタイムを適切に構成するかPMMLObject#setLocator(Locatable)null引数を使用して呼び出して既存の SAX ロケーター インスタンスを単純にクリアすることにより、SAX ロケーター情報を無効にすることができます。後者の機能は、org.jpmml.model.visitors.LocatorNullifierVisitor クラスによって形式化されています。

完全な例については、公式のJPMML-Spark プロジェクトorg.jpmml.spark.EvaluatorUtilのユーティリティ クラス (特に 73 ~ 75 行目あたり)を参照してください。そもそもJPMML-Sparkを使ってみませんか?

于 2016-02-17T20:14:12.787 に答える