Hadoop Thanks 内で複数のシーケンス ファイルを 1 つのシーケンス ファイルにマージする方法。
4 に答える
複数のファイルを1つのファイルにマージする場合は、次の2つの答えがあります。
母国語
getmerge
使用法:hadoop fs -getmerge <src> <localdst>
ソースディレクトリと宛先ファイルを入力として受け取り、src内のファイルを宛先ローカルファイルに連結します。オプションで、addnlを設定して、各ファイルの最後に改行文字を追加できるようにすることができます。
Java API
org.apache.hadoop.fs.FileUtil.copyMerge(FileSystem srcFS, Path srcDir, FileSystem dstFS, Path dstFile, boolean deleteSource, Configuration conf, String addString);
ディレクトリ内のすべてのファイルを1つの出力ファイルにコピーします(マージ)
hdfsにコピー
put
使用法:hadoop dfs -put <localsrc> ... <dst>
単一のsrcまたは複数のsrcをローカルファイルシステムから宛先ファイルシステムにコピーします。また、stdinからの入力を読み取り、宛先ファイルシステムに書き込みます。
copyFromLocal
使用法:hadoop dfs -copyFromLocal <localsrc> URI
ソースがローカルファイル参照に制限されていることを除いて、putコマンドと同様です。
フォークリフトを検討したことがありますか?SequenceFile のマージを含む、特定の SequenceFile 雑用を処理するために作成しました。
あなたの場合、次を実行できます:
forqlift seq2seq --file new_combined_file.seq \
original_file1.seq original_file2.seq original_file3.seq ...
確かに、forqlift のseq2seq
ツールは「実験的」とマークされています ... しかし、私の (確かに限られた) 内部テストではうまく機能しました。
大量のシーケンス ファイルを扱っている場合は、 をMapper
マッパーおよびReducer
リデューサーとして使用する MapReduce ジョブを作成することをお勧めします。I/O 形式の場合は、 と を使用SequenceFileInputFormat
しSequenceFileOutputFormat
ます。レデューサーの数を 1 に設定します。これらはすべて、ドライバー/メイン コードの Configuration および Job オブジェクトで設定したものです。出力フォーマットの設定方法、入力フォーマットの設定方法、マッパーの設定方法、リデューサーの設定方法を参照してください。
Mapper
andのデフォルトの動作はReducer
、データに対して何もしないことに注意してください。単にデータを通過させます。そのため、ここでは map 関数や reduce 関数を記述しません。
これにより、シーケンス ファイルが読み込まれ、マッパー内のデータは何も処理されず、すべてのレコードがレデューサーにシャッフルされ、すべてが 1 つのファイルに出力されます。これには、出力シーケンス ファイル内のキーを並べ替えるという副作用があります。
シーケンス ファイルとしてではなく、バイナリ ファイルとしてマージするため、シーケンス ファイルにhadoop getmergeを使用することはできません (したがって、マージされたファイルに多くのヘッダーが含まれます..)。
したがって、@Donald-miner が提案したように、単一のレデューサーを使用して小さな Hadoop ジョブを作成するか、 and を使用してスタンドアロンのマージを作成することができSequenceFile.Reader
ますSeuquenceFile.Writer
。
私は2番目のオプションを取りました。これが私のコードです:
package ru.mail.go.webbase.markov.hadoop.utils;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.util.ReflectionUtils;
public class SequenceFilesUtils {
private static final Configuration conf = HBaseConfiguration.create();
public static <K, V> void merge(Path fromDirectory, Path toFile, Class<K> keyClass, Class<V> valueClass) throws IOException {
FileSystem fs = FileSystem.get(conf);
if (!fs.isDirectory(fromDirectory)) {
throw new IllegalArgumentException("'" + fromDirectory.toString() + "' is not a directory");
}
SequenceFile.Writer writer = SequenceFile.createWriter(
conf,
SequenceFile.Writer.file(toFile),
SequenceFile.Writer.keyClass(keyClass),
SequenceFile.Writer.valueClass(valueClass)
);
for (FileStatus status : fs.listStatus(fromDirectory)) {
if (status.isDirectory()) {
System.out.println("Skip directory '" + status.getPath().getName() + "'");
continue;
}
Path file = status.getPath();
if (file.getName().startsWith("_")) {
System.out.println("Skip \"_\"-file '" + file.getName() + "'"); //There are files such "_SUCCESS"-named in jobs' ouput folders
continue;
}
System.out.println("Merging '" + file.getName() + "'");
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(file));
Writable key = (Writable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Writable value = (Writable) ReflectionUtils.newInstance(reader.getValueClass(), conf);
while (reader.next(key, value)) {
writer.append(key, value);
}
reader.close();
}
writer.close();
}
}
これが私のテストです:
public class SequenceFilesUtilsTest {
private static final String OUT_PATH = "./UNIVERSE/SequenceFilesUtilsTest/";
@Before
public void initEnviroment() throws IOException {
TestUtils.createDirectory(OUT_PATH);
TestUtils.createDirectory(OUT_PATH + "/in");
}
@Test
public void test() throws Exception {
Configuration conf = HBaseConfiguration.create();
Path inPath1 = new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/in/in1.seq");
System.out.println("Saving first part to '" + inPath1 + "'");
SequenceFile.Writer writer1 = SequenceFile.createWriter(
conf,
SequenceFile.Writer.file(inPath1),
SequenceFile.Writer.keyClass(LongWritable.class),
SequenceFile.Writer.valueClass(Text.class)
);
writer1.append(new LongWritable(101), new Text("FIRST1"));
writer1.append(new LongWritable(102), new Text("FIRST2"));
writer1.append(new LongWritable(103), new Text("FIRST3"));
writer1.append(new LongWritable(104), new Text("FIRST4"));
writer1.close();
Path inPath2 = new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/in/in2.seq");
System.out.println("Saving second part to '" + inPath2 + "'");
SequenceFile.Writer writer2 = SequenceFile.createWriter(
conf,
SequenceFile.Writer.file(inPath2),
SequenceFile.Writer.keyClass(LongWritable.class),
SequenceFile.Writer.valueClass(Text.class)
);
writer2.append(new LongWritable(201), new Text("SND1"));
writer2.append(new LongWritable(202), new Text("SND2"));
writer2.append(new LongWritable(203), new Text("SND3"));
writer2.close();
SequenceFilesUtils.merge(
new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/in"),
new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/merged.seq"),
LongWritable.class,
Text.class);
Path mergedPath = new Path("file://" + new File(OUT_PATH).getAbsolutePath() + "/merged.seq");
SequenceFile.Reader reader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(mergedPath));
LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), conf);
Text value = (Text) ReflectionUtils.newInstance(reader.getValueClass(), conf);
reader.next(key, value);
Assert.assertEquals(101, key.get());
Assert.assertEquals("FIRST1", value.toString());
reader.next(key, value);
Assert.assertEquals(102, key.get());
Assert.assertEquals("FIRST2", value.toString());
reader.next(key, value);
Assert.assertEquals(103, key.get());
Assert.assertEquals("FIRST3", value.toString());
reader.next(key, value);
Assert.assertEquals(104, key.get());
Assert.assertEquals("FIRST4", value.toString());
reader.next(key, value);
Assert.assertEquals(201, key.get());
Assert.assertEquals("SND1", value.toString());
reader.next(key, value);
Assert.assertEquals(202, key.get());
Assert.assertEquals("SND2", value.toString());
reader.next(key, value);
Assert.assertEquals(203, key.get());
Assert.assertEquals("SND3", value.toString());
reader.close();
}
}