2

Spark が私の機能的インターフェースのシリアル化に失敗する理由を理解するのを手伝ってくれませんSerializablePredicateか?

例外を引き起こすコード

public class NonSparkService {

    public void filter() {
        SerializablePredicate<MyClass> filter = new SerializablePredicate<Pair>() {
            @Override
            public boolean test(Pair pair) {
                return true;
            }
        };
        new SparkFilterJob().run(filter);
    } 
}

public class SparkFilteringJob implements Serializable {

    private static final Logger logger = LogManager.getLogger(SparkFilteringJob.class);

    private static final long serialVersionUID = 43L;

    public void run(SerializablePredicate filter) {

        sparkConf configuration =
            new SparkConf()
                .setAppName(name)
                .setMaster(host)
                .setJars(jars)
                .set("spark.shuffle.service.enabled", "true")
                .set("spark.dynamicAllocation.enabled", "true")
                .set("spark.dynamicAllocation.maxExecutors", "10") 
                .set("spark.executor.cores", "7")
                .set("spark.executor.memory", "7GB") 
                .set("spark.scheduler.mode", "FAIR")
                .set("spark.kryoserializer.buffer.max", "1024m")
                .set("spark.rpc.askTimeout", "500s")
                .set("spark.rpc.lookupTimeout", "500s")
                .set("spark.rpc.numRetries", "10")
                .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
               .set("spark.kryo.registrator", "my_package_path.MyKryoRegistrator")

        return SparkSession
                .builder()
                .config(configuration)
                .getOrCreate()
                .read()
                .parquet(pathStr)
                .as(Encoders.kryo(MyClass.class)) // MyClass implements Serializable

                //Problem - this filter cause java.io.NotSerializableException, but I can serialize SerializablePredicate (see code below)
                .filter(new FilterFunction<MyClass>() {
                    @Override
                    public boolean call(MyClass value) throws Exception {
                        return filter.test(value);
                    }
                })
                .collectAsList();
      }
}

MyKryoRegistrator.java

public class MyKryoRegistrator implements KryoRegistrator {

    @Override
    public void registerClasses(Kryo kryo) {
        kryo.register(SerializablePredicate.class);
    }
}

SerializablePredicate

@FunctionalInterface
public interface SerializablePredicate<T> extends Predicate<T>, Serializable {

    /**
     * {@inheritDoc}. The result is serializable if {@code other} is.
     */
    default SerializablePredicate<T> and(SerializablePredicate<T> other) {
        return (SerializablePredicate<T>) value -> test(value) && other.test(value);
    }

    /**
     * {@inheritDoc}. The result is serializable if {@code other} is.
     */
    default SerializablePredicate<T> or(SerializablePredicate<T> other) {
        return (SerializablePredicate<T>) value -> test(value) || other.test(value);
    }

    /**
     * {@inheritDoc}. The result is serializable.
     */
    default SerializablePredicate<T> negate() {
        return (SerializablePredicate<T>) value -> !test(value);
    }

}
4

0 に答える 0