0

ファイルに保存したくない独自のPig Storeクラスを作成しています。これをサードパーティのデータストアに送信する予定です(API呼び出しを除く)。

注: Cloudera の VirtualBox イメージで実行しています。

Java クラス (以下にリスト) を作成し、以下の id.pig スクリプトで使用している mystore.jar を作成しました。

store B INTO 'mylocation' USING MyStore('mynewlocation')

このスクリプトを pig で実行すると、以下のエラーが表示されます: ERROR 6000: Output location validation failed for: 'file://home/cloudera/test/id.out More info to follow: Output directory not set.

or.apache.pig.impl.plan.VisitorException: ERROR 6000:
at or.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileValidator.visit(InputOutputFileValidator.java:95)

助けてください!

-------------------- MyStore.java ----------------------

public class MyStore extends StoreFunc {
    protected RecordWriter writer = null;
    private String location = null;


    public MyStore () {
        location= null;
    }

    public MyStore (String location) {
        this.location= location;
    }

    @Override
    public OutputFormat getOutputFormat() throws IOException {
        return new MyStoreOutputFormat(location);
    }

    @Override
    public void prepareToWrite(RecordWriter writer) throws IOException {
        this.writer = writer;
    }

    @Override
    public void putNext(Tuple tuple) throws IOException {
        //write tuple to location

        try {
            writer.write(null, tuple.toString());
        } catch (InterruptedException e) {          
            e.printStackTrace();
        }
    }

    @Override
    public void setStoreLocation(String location, Job job) throws IOException {
        if(location!= null)
            this.location= location;
    }

}

-------------------- MyStoreOutputFormat.java ----------------------

import java.io.DataOutputStream;
import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.data.Tuple;

public class MyStoreOutputFormat extends
        TextOutputFormat<WritableComparable, Tuple> {
    private String location = null;

    public MyStoreOutputFormat(String location) {

        this.location = location;
    }

    @Override
    public RecordWriter<WritableComparable, Tuple> getRecordWriter(
            TaskAttemptContext job) throws IOException, InterruptedException {

        Configuration conf = job.getConfiguration();

        String extension = location;
        Path file = getDefaultWorkFile(job, extension);     
        FileSystem fs = file.getFileSystem(conf);

        FSDataOutputStream fileOut = fs.create(file, false);

        return new MyStoreRecordWriter(fileOut);
    }

    protected static class MyStoreRecordWriter extends
            RecordWriter<WritableComparable, Tuple> {

        DataOutputStream out = null;

        public MyStoreRecordWriter(DataOutputStream out) {
            this.out = out;
        }

        @Override
        public void close(TaskAttemptContext taskContext) throws IOException,
                InterruptedException {
            // close the location
        }

        @Override
        public void write(WritableComparable key, Tuple value)
                throws IOException, InterruptedException {

            // write the data to location
            if (out != null) {
                out.writeChars(value.toString()); // will be calling API later. let me first dump to the location!
            }
        }

    }
}

ここで何か不足していますか?

4

1 に答える 1

1

まず、インスタンス変数ではなく、ジョブ構成を使用して場所の値を保存する必要があると思います

setStoreLocation メソッドのローカル変数 'location' への割り当ては、ジョブの計画時に呼び出されますが、getOutputFormat 呼び出しは実行フェーズまで行われない場合があり、その時点までに location 変数が設定されなくなる可能性があります (クラスの新しいインスタンス作成されている可能性があります)。

のソースを見るとPigStorage.setStoreLocation、場所が Job 構成 (2 行目) に保存されていることがわかります。

@Override
public void setStoreLocation(String location, Job job) throws IOException {
    job.getConfiguration().set("mapred.textoutputformat.separator", "");
    FileOutputFormat.setOutputPath(job, new Path(location));

    if( "true".equals( job.getConfiguration().get( "output.compression.enabled" ) ) ) {
        FileOutputFormat.setCompressOutput( job, true );
        String codec = job.getConfiguration().get( "output.compression.codec" );
        try {
            FileOutputFormat.setOutputCompressorClass( job,  (Class<? extends CompressionCodec>) Class.forName( codec ) );
        } catch (ClassNotFoundException e) {
            throw new RuntimeException("Class not found: " + codec );
        }
    } else {
        // This makes it so that storing to a directory ending with ".gz" or ".bz2" works.
        setCompression(new Path(location), job);
    }
}

したがって、場所をジョブ変数に保存する必要があると思います:

@Override
public void setStoreLocation(String location, Job job) throws IOException {
    if(location!= null)
        job.getConfiguration().set("mylocation", location);
}

createRecordReader メソッドで抽出できるカスタム出力形式は次のとおりです。

@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(
        TaskAttemptContext job) throws IOException, InterruptedException {

    Configuration conf = job.getConfiguration();

    String extension = conf.get("mylocation");
    Path file = getDefaultWorkFile(job, extension);     
    FileSystem fs = file.getFileSystem(conf);

    FSDataOutputStream fileOut = fs.create(file, false);

    return new MyStoreRecordWriter(fileOut);
}

最後に (おそらく、表示されているエラーの実際の原因)、出力形式は TextOutputFormat を拡張getDefaultWorkFileし、レコード ライターでメソッドを使用します。このメソッドは、HDFS でファイルを出力する場所を知る必要があります。 setStoreLocation メソッドで呼び出さFileOutputFormat.setOutputPath(job, new Path(location));れません (以前に貼り付けた PigStorage.setStoreLocation メソッドを参照してください)。したがって、エラーは、デフォルトの作業ファイルを作成する場所がわからないためです。

于 2013-02-06T12:11:53.900 に答える