5

ジョブNullPointerExceptionを起動するときに取得しています。のメソッドMapReduceによってスローされています。カスタム、、および値クラスを使用しています。SerializationFactorygetSerializer()InputSplitInputFormatRecordReaderMapReduce

InputFormatクラスによって分割が作成された後、 RecordReader. 私が知る限り、それは「ステージング エリアのクリーンアップ」メッセージの直後に発生しています。

スタック トレースで示された場所で Hadoop ソースを確認すると、nullポインターgetSerialization()を受け取るとエラーが発生しているように見えます。Class<T>JobClientwriteNewSplits()はそのメソッドを次のように呼び出します。

Serializer<T> serializer = factory.getSerializer((Class<T>) split.getClass());

したがって、getClass()カスタムInputSplitオブジェクトでが呼び出されると、nullポインターが返されると思いますが、それはただ不可解です。何か案は?

エラーからの完全なスタック トレースは次のとおりです。

12/06/24 14:26:49 INFO mapred.JobClient: ステージング領域のクリーンアップ hdfs://localhost:54310/tmp/hadoop-s3cur3/mapred/staging/s3cur3/.staging/job_201206240915_0035
スレッド「メイン」での例外 java.lang.NullPointerException
    org.apache.hadoop.io.serializer.SerializationFactory.getSerializer(SerializationFactory.java:73) で
    org.apache.hadoop.mapreduce.split.JobSplitWriter.writeNewSplits (JobSplitWriter.java:123) で
    org.apache.hadoop.mapreduce.split.JobSplitWriter.createSplitFiles (JobSplitWriter.java:74) で
    org.apache.hadoop.mapred.JobClient.writeNewSplits (JobClient.java:968) で
    org.apache.hadoop.mapred.JobClient.writeSplits (JobClient.java:979) で
    org.apache.hadoop.mapred.JobClient.access$600(JobClient.java:174) で
    org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:897) で
    org.apache.hadoop.mapred.JobClient$2.run(JobClient.java:850) で
    java.security.AccessController.doPrivileged(ネイティブメソッド)で
    javax.security.auth.Subject.doAs (Subject.java:396) で
    org.apache.hadoop.security.UserGroupInformation.doAs (UserGroupInformation.java:1121) で
    org.apache.hadoop.mapred.JobClient.submitJobInternal (JobClient.java:850) で
    org.apache.hadoop.mapreduce.Job.submit (Job.java:500) で
    org.apache.hadoop.mapreduce.Job.waitForCompletion (Job.java:530) で
    edu.cs.illinois.cogcomp.hadoopinterface.infrastructure.CuratorJob.start (CuratorJob.java:94) で
    edu.cs.illinois.cogcomp.hadoopinterface.HadoopInterface.main (HadoopInterface.java:58) で
    sun.reflect.NativeMethodAccessorImpl.invoke0(ネイティブメソッド)
    sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) で
    sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) で
    java.lang.reflect.Method.invoke(Method.java:597) で
    org.apache.hadoop.util.RunJar.main(RunJar.java:156) で

ありがとう!

編集: カスタム InputSplit の私のコードは次のとおりです。

import . . .

/**
 * A document directory within the input directory. 
 * Returned by DirectoryInputFormat.getSplits()
 * and passed to DirectoryInputFormat.createRecordReader().
 *
 * Represents the data to be processed by an individual Map process.
 */
public class DirectorySplit extends InputSplit {
    /**
     * Constructs a DirectorySplit object
     * @param docDirectoryInHDFS The location (in HDFS) of this
     *            document's directory, complete with all annotations.
     * @param fs The filesystem associated with this job
     */
    public  DirectorySplit( Path docDirectoryInHDFS, FileSystem fs )
            throws IOException {
        this.inputPath = docDirectoryInHDFS;
        hash = FileSystemHandler.getFileNameFromPath(inputPath);
        this.fs = fs;
    }

    /**
     * Get the size of the split so that the input splits can be sorted by size.
     * Here, we calculate the size to be the number of bytes in the original
     * document (i.e., ignoring all annotations).
     *
     * @return The number of characters in the original document
     */
    @Override
    public long getLength() throws IOException, InterruptedException {
        Path origTxt = new Path( inputPath, "original.txt" );
        HadoopInterface.logger.log( msg );
        return FileSystemHandler.getFileSizeInBytes( origTxt, fs);
    }

    /**
     * Get the list of nodes where the data for this split would be local.
     * This list includes all nodes that contain any of the required data---it's
     * up to Hadoop to decide which one to use.
     *
     * @return An array of the nodes for whom the split is local
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    public String[] getLocations() throws IOException, InterruptedException {
        FileStatus status = fs.getFileStatus(inputPath);

        BlockLocation[] blockLocs = fs.getFileBlockLocations( status, 0,
                                                              status.getLen() );

        HashSet<String> allBlockHosts = new HashSet<String>();
        for( BlockLocation blockLoc : blockLocs ) {
            allBlockHosts.addAll( Arrays.asList( blockLoc.getHosts() ) );
        }

        return (String[])allBlockHosts.toArray();
    }

    /**
     * @return The hash of the document that this split handles
     */
    public String toString() {
        return hash;
    }

    private Path inputPath;
    private String hash;
    private FileSystem fs;
}
4

1 に答える 1

5

InputSplit は Writable を拡張しません。入力分割が Writable を実装することを明示的に宣言する必要があります。

于 2012-06-25T14:32:26.313 に答える