ファイルに保存したくない独自のPig Storeクラスを作成しています。これをサードパーティのデータストアに送信する予定です(API呼び出しを除く)。
注: Cloudera の VirtualBox イメージで実行しています。
Java クラス (以下にリスト) を作成し、以下の id.pig スクリプトで使用している mystore.jar を作成しました。
store B INTO 'mylocation' USING MyStore('mynewlocation')
このスクリプトを pig で実行すると、以下のエラーが表示されます: ERROR 6000: Output location validation failed for: 'file://home/cloudera/test/id.out More info to follow: Output directory not set.
or.apache.pig.impl.plan.VisitorException: ERROR 6000:
at or.apache.pig.newplan.logical.rules.InputOutputFileValidator$InputOutputFileValidator.visit(InputOutputFileValidator.java:95)
助けてください!
-------------------- MyStore.java ----------------------
public class MyStore extends StoreFunc {
protected RecordWriter writer = null;
private String location = null;
public MyStore () {
location= null;
}
public MyStore (String location) {
this.location= location;
}
@Override
public OutputFormat getOutputFormat() throws IOException {
return new MyStoreOutputFormat(location);
}
@Override
public void prepareToWrite(RecordWriter writer) throws IOException {
this.writer = writer;
}
@Override
public void putNext(Tuple tuple) throws IOException {
//write tuple to location
try {
writer.write(null, tuple.toString());
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void setStoreLocation(String location, Job job) throws IOException {
if(location!= null)
this.location= location;
}
}
-------------------- MyStoreOutputFormat.java ----------------------
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.pig.data.Tuple;
public class MyStoreOutputFormat extends
TextOutputFormat<WritableComparable, Tuple> {
private String location = null;
public MyStoreOutputFormat(String location) {
this.location = location;
}
@Override
public RecordWriter<WritableComparable, Tuple> getRecordWriter(
TaskAttemptContext job) throws IOException, InterruptedException {
Configuration conf = job.getConfiguration();
String extension = location;
Path file = getDefaultWorkFile(job, extension);
FileSystem fs = file.getFileSystem(conf);
FSDataOutputStream fileOut = fs.create(file, false);
return new MyStoreRecordWriter(fileOut);
}
protected static class MyStoreRecordWriter extends
RecordWriter<WritableComparable, Tuple> {
DataOutputStream out = null;
public MyStoreRecordWriter(DataOutputStream out) {
this.out = out;
}
@Override
public void close(TaskAttemptContext taskContext) throws IOException,
InterruptedException {
// close the location
}
@Override
public void write(WritableComparable key, Tuple value)
throws IOException, InterruptedException {
// write the data to location
if (out != null) {
out.writeChars(value.toString()); // will be calling API later. let me first dump to the location!
}
}
}
}
ここで何か不足していますか?