現在、Hadoop 0.20.2 と古い API を使用しています。やりたいこと マップサイドジョイン。1 つはエッジ、もう 1 つはノードを持つ 2 つのファイルで構成されるグラフ データセットがあります。エッジは「ターゲット ソース ラベル」の形式で、ノードは「ノード ID '-' ラベル 0」で、最初の値でソートされます。
小さなおもちゃの例 (6 つのノードと 8 つのエッジ) ではすべてが正常に機能していますが、スケールアップすると、入力ファイルの最初の部分のみが結合されます。すべてが正しい形式でソートされている必要があるため、この理由はわかりません。何か不足していますか?
「デバッグ」のために、マップでインデントされた TupleWritable の値を使用してキーを出力しました。http://pastebin.com/rcNx2r5cで確認できます。
ノード ファイルは 9416 バイト、エッジ ファイルは 15797 バイトで、結果にすべて出力されます。しかし、キー 98 の後、結合は停止します。次に、最初にエッジを出力し、次にノードを出力しますが、キー 99 を持つノードとエッジの両方が存在します。
CompositeInputFormat のジョブ設定:
conf.setInputFormat(CompositeInputFormat.class);
Path[] input = new Path[] { inputNodes, inputEdges};
conf.set("key.value.separator.in.input.line", "\t");
conf.set("mapred.join.expr", CompositeInputFormat.compose(
"outer", KeyValueTextInputFormat.class,
input )
);
どんな助けでも大歓迎ですので、事前に感謝します!
編集:私は問題を解決しました。興味のある方へ; 問題は KeyValueTextInputFormat でした。LongWritable キーと Text 値を持つべきだった Text として、キーと値の両方があります。これは問題ないと思いましたが、これで失敗するようです。そこで、KeyValueTextInputFormat をベースに独自の入力フォーマットを作成しました。
public class KeyValueLongInputFormat extends FileInputFormat<LongWritable, Text> implements JobConfigurable {
private CompressionCodecFactory compressionCodecs = null;
@Override
public void configure(JobConf conf) {
compressionCodecs = new CompressionCodecFactory(conf);
}
protected boolean isSplitable(FileSystem fs, Path file) {
return compressionCodecs.getCodec(file) == null;
}
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit genericSplit, JobConf job, Reporter reporter)
throws IOException {
reporter.setStatus(genericSplit.toString());
return new KeyValueLongLineRecordReader(job, (FileSplit) genericSplit);
}
}
と
public class KeyValueLongLineRecordReader implements RecordReader<LongWritable, Text> {
private final LineRecordReader lineRecordReader;
private byte separator = (byte) '\t';
private LongWritable dummyKey;
private Text innerValue;
public Class getKeyClass() {
return LongWritable.class;
}
public LongWritable createKey() {
return new LongWritable();
}
public Text createValue() {
return new Text();
}
public KeyValueLongLineRecordReader(Configuration job, FileSplit split) throws IOException {
lineRecordReader = new LineRecordReader(job, split);
dummyKey = lineRecordReader.createKey();
innerValue = lineRecordReader.createValue();
String sepStr = job.get("key.value.separator.in.input.line", "\t");
this.separator = (byte) sepStr.charAt(0);
}
public static int findSeparator(byte[] utf, int start, int length, byte sep) {
for (int i = start; i < (start + length); i++) {
if (utf[i] == sep) {
return i;
}
}
return -1;
}
/** Read key/value pair in a line. */
public synchronized boolean next(LongWritable key, Text value) throws IOException {
LongWritable tKey = key;
Text tValue = value;
byte[] line = null;
int lineLen = -1;
if (lineRecordReader.next(dummyKey, innerValue)) {
line = innerValue.getBytes();
lineLen = innerValue.getLength();
} else {
return false;
}
if (line == null)
return false;
int pos = findSeparator(line, 0, lineLen, this.separator);
if (pos == -1) {
tKey.set(Long.valueOf(new String(line, 0, lineLen)));
tValue.set("");
} else {
int keyLen = pos;
byte[] keyBytes = new byte[keyLen];
System.arraycopy(line, 0, keyBytes, 0, keyLen);
int valLen = lineLen - keyLen - 1;
byte[] valBytes = new byte[valLen];
System.arraycopy(line, pos + 1, valBytes, 0, valLen);
tKey.set(Long.valueOf(new String(keyBytes)));
tValue.set(valBytes);
}
return true;
}
public float getProgress() {
return lineRecordReader.getProgress();
}
public synchronized long getPos() throws IOException {
return lineRecordReader.getPos();
}
public synchronized void close() throws IOException {
lineRecordReader.close();
}
}
私の問題を修正しました。