1

オープン ストリート マップ データの Hadoop ベースのインジェスターに取り組み始めたところです。いくつかの形式がありますが、私はプロトコル バッファ ベースの形式をターゲットにしています (注意 - 純粋な pb ではありません)。

カスタムレコードリーダー/入力形式で可変長エンコーディングを処理するのではなく、ファイルをシーケンスファイルに事前に分割する方が効率的であるように見えますが、サニティチェックが必要です。

形式については、 PBF 形式の説明で詳しく説明されていますが 、基本的には [BlobHeader,Blob] ブロックのコレクションです。

Blobヘッダーがあります

message BlobHeader {
   required string type = 1;
   optional bytes indexdata = 2;
   required int32 datasize = 3;
 }

次に、Blob (サイズはヘッダーの datasize パラメーターで定義されます)

 message Blob {
   optional bytes raw = 1; // No compression
   optional int32 raw_size = 2; // Only set when compressed, to the uncompressed size
   optional bytes zlib_data = 3;
   // optional bytes lzma_data = 4; // PROPOSED.
   // optional bytes OBSOLETE_bzip2_data = 5; // Deprecated.
 }

明らかにブロブに取り掛かると、より多くの構造があります-しかし、私はそれをマッパーで処理します-私がやりたいことは、最初はマッパーごとに1つのブロブを持つことです(後でマッパーごとにブロブの倍数になる可能性があります)。

他の入力形式/レコードリーダーの一部は、「十分に大きい」分割サイズを使用し、区切り記号まで後方/前方にシークしますが、ブロブ/ヘッダーのオフセットを知らせる区切り記号がないため、インデックスがないためです。それらのいずれかを指しています-最初にファイルをストリーミングしないと、分割ポイントを取得する方法がわかりません。

これで、実際にディスクからファイル全体を読み取る必要がなくなります。ヘッダーの読み取りから始め、その情報を使用して BLOB をシークし、それを最初の分割ポイントとして設定してから繰り返すことができます。しかし、それは私が思いつくことができるシーケンスファイルへの事前分割に代わる唯一の方法です.

これを処理するより良い方法はありますか?そうでない場合は、2つの提案について考えてください。

4

1 に答える 1

4

さて、私は getSplits メソッドでバイナリ ファイルを解析しました。データの 99% 以上をスキップしているので、かなり高速です (planet-osm 22GB ワールド ファイルの場合は約 20 秒)。他の誰かがつまずいた場合の getSplits メソッドは次のとおりです。

@Override
public List<InputSplit> getSplits(JobContext context){
    List<InputSplit> splits = new ArrayList<InputSplit>();
    FileSystem fs = null;
    Path file = OSMPBFInputFormat.getInputPaths(context)[0]; 
    FSDataInputStream in = null;
    try {
        fs = FileSystem.get(context.getConfiguration());
        in = fs.open(file);
        long pos = 0;
        while (in.available() > 0){
            int len = in.readInt(); 
            byte[] blobHeader = new byte[len]; 
            in.read(blobHeader);
            BlobHeader h = BlobHeader.parseFrom(blobHeader);
            FileSplit split = new FileSplit(file, pos,len + h.getDatasize(), new String[] {});
            splits.add(split);
            pos += 4;
            pos += len;
            pos += h.getDatasize();
            in.skip(h.getDatasize());
        }
    } catch (IOException e) {
        sLogger.error(e.getLocalizedMessage());
    } finally {
        if (in != null) {try {in.close();}catch(Exception e){}};
        if (fs != null) {try {fs.close();}catch(Exception e){}};
    }
    return splits;
}

これまでのところ問題なく動作していますが、まだ出力を正しく処理していません。pbfをhdfsにコピーし、単一のマッパーでシーケンスに変換してから取り込むよりも明らかに高速です(コピー時間が支配的です)。また、外部プログラムを hdfs のシーケンス ファイルにコピーしてから、hdfs に対してマッパーを実行するよりも ~20% 高速です (後者はスクリプト化されています)。だからここに苦情はありません。

これにより、すべてのブロックに対してマッパーが生成されることに注意してください。これは、惑星ワールド ファイルの最大 23k マッパーです。私は実際には、分割ごとに複数のブロックをまとめています。分割がコレクションに追加される前に、x 回ループするだけです。

BlobHeader については、上記の OSM wiki リンクから protobuf .proto ファイルをコンパイルしました。必要に応じて、OSM バイナリ クラスから事前に生成されたものをプルすることもできます。maven フラグメントは次のとおりです。

<dependency>
    <groupId>org.openstreetmap.osmosis</groupId>
    <artifactId>osmosis-osm-binary</artifactId>
    <version>0.43-RELEASE</version>
</dependency>
于 2013-11-20T01:41:11.923 に答える