4

Cassandra からデータを取得するスタンドアロンの Spark プログラムを作成しています。例に従い、newAPIHadoopRDD() および ColumnFamilyInputFormat クラスを介して RDD を作成しました。RDD が作成されますが、RDD の .groupByKey() メソッドを呼び出すと、NotSerializableException が発生します。

public static void main(String[] args) {
    SparkConf sparkConf = new SparkConf();
    sparkConf.setMaster("local").setAppName("Test");
    JavaSparkContext ctx = new JavaSparkContext(sparkConf);

    Job job = new Job();
    Configuration jobConf = job.getConfiguration();
    job.setInputFormatClass(ColumnFamilyInputFormat.class);

    ConfigHelper.setInputInitialAddress(jobConf, host);
    ConfigHelper.setInputRpcPort(jobConf, port);
    ConfigHelper.setOutputInitialAddress(jobConf, host);
    ConfigHelper.setOutputRpcPort(jobConf, port);
    ConfigHelper.setInputColumnFamily(jobConf, keySpace, columnFamily, true);
    ConfigHelper.setInputPartitioner(jobConf,"Murmur3Partitioner");
    ConfigHelper.setOutputPartitioner(jobConf,"Murmur3Partitioner");

    SlicePredicate predicate = new SlicePredicate();
    SliceRange sliceRange = new SliceRange();
    sliceRange.setFinish(new byte[0]);
    sliceRange.setStart(new byte[0]);
    predicate.setSlice_range(sliceRange);
    ConfigHelper.setInputSlicePredicate(jobConf, predicate);

    JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> rdd =
    spark.newAPIHadoopRDD(jobConf,
    ColumnFamilyInputFormat.class.asSubclass(org.apache.hadoop.mapreduce.InputFormat.class),
    ByteBuffer.class, SortedMap.class);

    JavaPairRDD<ByteBuffer, Iterable<SortedMap<ByteBuffer, IColumn>>> groupRdd = rdd.groupByKey();
    System.out.println(groupRdd.count());
}

例外:

java.io.NotSerializableException: java.nio.HeapByteBuffer の java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1164) の java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1518) の java.io.ObjectOutputStream.writeSerialData( ObjectOutputStream.java:1483) で java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1400) で java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1158) で java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:330) で) org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) で org.apache.spark.storage.DiskBlockObjectWriter.write(BlockObjectWriter.scala:179) で org.apache.spark.scheduler.ShuffleMapTask$ org.apache.spark.scheduler の $anonfun$runTask$1.apply(ShuffleMapTask.scala:161)。ShuffleMapTask$$anonfun$runTask$1.apply(ShuffleMapTask.scala:158) で scala.collection.Iterator$class.foreach(Ite​​rator.scala:727) org.apache.spark.InterruptibleIterator.foreach(InterruptibleIterator.scala:28) でorg.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) で org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) で org.apache.spark.scheduler.Task.run (Task.scala:51) org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) で java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) で java. java.lang.Thread.run(Thread.java:662) の util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918)InterruptibleIterator.foreach(InterruptibleIterator.scala:28) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org .apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask (ThreadPoolExecutor.java:895) で java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) で java.lang.Thread.run(Thread.java:662) でInterruptibleIterator.foreach(InterruptibleIterator.scala:28) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:158) org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99) org .apache.spark.scheduler.Task.run(Task.scala:51) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:187) at java.util.concurrent.ThreadPoolExecutor$Worker.runTask (ThreadPoolExecutor.java:895) で java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:918) で java.lang.Thread.run(Thread.java:662) でjava.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) の executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 918) java.lang.Thread.run(Thread.java:662) でjava.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:895) の executor.Executor$TaskRunner.run(Executor.scala:187) java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java: 918) java.lang.Thread.run(Thread.java:662) で

私がやろうとしているのは、すべての行キー列を単一のエントリにマージすることです。次のように reduceByKey() メソッドを使用しようとすると、同じ例外が発生します。

JavaPairRDD<ByteBuffer, SortedMap<ByteBuffer, IColumn>> reducedRdd = rdd.reduceByKey(
    new Function2<SortedMap<ByteBuffer, IColumn>, SortedMap<ByteBuffer, IColumn>, sortedMap<ByteBuffer, IColumn>>() {
        public SortedMap<ByteBuffer, IColumn> call(SortedMap<ByteBuffer, IColumn> arg0,
            SortedMap<ByteBuffer, IColumn> arg1) throws Exception {
            SortedMap<ByteBuffer, IColumn> sortedMap = new TreeMap<ByteBuffer, IColumn>(arg0.comparator());
            sortedMap.putAll(arg0);
            sortedMap.putAll(arg1);
            return sortedMap;
        }
    }
);

私は使っている:

  • spark-1.0.0-bin-hadoop1
  • カサンドラ 1.2.12
  • Java 1.6

誰が問題が何であるか知っていますか?シリアル化に失敗するのは何ですか?

ありがとう、
シャイ

4

1 に答える 1

4

あなたの問題は、おそらく ByteBuffers をシリアライズしようとしたことが原因です。これらはシリアライズ可能ではなく、RDD を生成する前にバイト配列に変換する必要があります。

こちらから入手できる Spark 用の公式 DataStax Cassandra ドライバーを試す必要があります。

于 2014-07-02T13:39:29.163 に答える