2

HDFS の圧縮された SequenceFile に毎秒 10 個のレコードを書き込み、5 分ごとに sync() を実行して、5 分より古いすべてのレコードを処理できるようにする小さなプログラムがあります。

私のコードはかなりの数の行なので、重要なビットのみを抽出しました。

// initialize

Configuration hdfsConfig = new Configuration();

CompressionCodecFactory codecFactory = new CompressionCodecFactory(hdfsConfig);
CompressionCodec compressionCodec = codecFactory.getCodecByName("default");

SequenceFile.Writer writer = SequenceFile.createWriter(
    hdfsConfig,
    SequenceFile.Writer.file(path),
    SequenceFile.Writer.keyClass(LongWritable.class),
    SequenceFile.Writer.valueClass(Text.class),
    SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK;, compressionCodec)
);

// ...


// append

LongWritable key = new LongWritable((new Date).getTime());
Text val = new Text("Some value");
writer.append(key, val);

// ...

// then every 5 minutes...

logger.info("about to sync...");
writer.hsync();
logger.info("synced!");

ログだけを見ると、同期操作は期待どおりに機能しているように見えますが、HDFS 上のファイルは小さいままです。しばらくすると、いくつかのヘッダーといくつかのイベントが追加される可能性がありますが、hsync() の頻度に近いものさえあります。ファイルが閉じられると、すべてが一度にフラッシュされます。

予想される同期のたびに、ファイルの内容を手動でチェックしてデータが存在するかどうかを確認しようとしましたが、ここでもファイルが空に見えます: hdfs dfs -text filename

writer.hsync() が機能しない既知の理由はありますか? もしそうなら、これに対する回避策はありますか?

この問題のさらなるテスト ケース:

import java.util.HashMap;
import java.util.Map;
import java.util.Date;
import java.util.Calendar;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;

import java.io.IOException;

import java.text.SimpleDateFormat;
import java.text.DateFormat;
import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
import java.util.Locale;

public class WriteTest {
    private static final Logger LOG = LoggerFactory.getLogger(WriteTest.class);

    public static void main(String[] args) throws Exception {

        SequenceFile.CompressionType compressionType = SequenceFile.CompressionType.RECORD;
        CompressionCodec compressionCodec;
        String compressionCodecStr = "default";
        CompressionCodecFactory codecFactory;
        Configuration hdfsConfig = new Configuration();

        codecFactory = new CompressionCodecFactory(hdfsConfig);
        compressionCodec = codecFactory.getCodecByName(compressionCodecStr);

        String hdfsURL = "hdfs://10.0.0.1/writetest/";

        Date date = new Date();

        Path path = new Path(
            hdfsURL,
            "testfile" + date.getTime()
        );

        SequenceFile.Writer writer = SequenceFile.createWriter(
            hdfsConfig,
            SequenceFile.Writer.keyClass(LongWritable.class),
            SequenceFile.Writer.valueClass(Text.class),
            SequenceFile.Writer.compression(compressionType, compressionCodec),
            SequenceFile.Writer.file(path)
        );

        for(int i=0;i<10000000;i++) {

            Text value = new Text("New value!");
            LongWritable key = new LongWritable(date.getTime());

            writer.append(key, value);
            writer.hsync();

            Thread.sleep(1000);
        }

        writer.close();
    }
}

その結果、sequencefile ヘッダーの書き込みの開始時に 1 つの fsync があり、その後は fsync がなくなります。ファイルが閉じられると、コンテンツがディスクに書き込まれます。

4

1 に答える 1

0

ここには複数の問題があります。

  1. ブロック圧縮

シーケンス ファイルでブロック圧縮を使用すると、多数のエントリがメモリにバッファリングされ、制限に達するか、手動で呼び出されると、ブロック圧縮形式で書き込まれます。sync

hsyncライターを呼び出すと、hsyncその基礎となるが呼び出されますFSDataOutputStream。ただし、圧縮バッファにあるデータはメモリに書き込まれません。したがって、そのデータをデータノードに確実に取得するには、sync最初にを呼び出してから を呼び出す必要がありますhsync

これを行うと、Datanode に送信されるブロック圧縮部分には、通常よりも少ないエントリが含まれることになることに注意してください。これは圧縮品質に悪影響を及ぼし、ディスクの使用量が増える可能性があります。(それが内部的にhsync呼び出さない理由だと思います。)sync

  1. Namenode に報告されるファイルサイズ

呼び出しfsyncはデータノードにデータを送信しますが、新しいファイルサイズをネームノードに報告しません。これに関する技術的な議論は、ここここにあります。どうやら毎回長さを更新するのはパフォーマンスが悪いようです。hsyncNamenode 情報を更新できるの特別なバージョンがありますが、 では公開されませんSequenceFile.Writer

    * @param syncFlags
    *          Indicate the semantic of the sync. Currently used to specify
    *          whether or not to update the block length in NameNode.
    */
    public void hsync(EnumSet<SyncFlag> syncFlags) throws IOException {
        flushOrSync(true, syncFlags);
    }

一方では、サイズの問題は、一部のツールが変更されていないファイル サイズを報告したとしても、データが安全に Datanode に到達し、Datanode で InputStream を開いたときに読み取ることができることを意味します。一方、SequenceFile.Reader には、圧縮タイプRecordNone. これらの圧縮タイプでは、Reader は長さ情報を使用して、どこまで読み取るかを決定します。この長さ情報は によって更新されないhsyncため、データが実際に利用可能であっても、誤って読み取りを停止します。Block圧縮された読み取りは明らかに長さ情報を使用しないため、このバグの影響を受けません。

于 2016-06-06T18:10:41.900 に答える