HDFS の圧縮された SequenceFile に毎秒 10 個のレコードを書き込み、5 分ごとに sync() を実行して、5 分より古いすべてのレコードを処理できるようにする小さなプログラムがあります。
私のコードはかなりの数の行なので、重要なビットのみを抽出しました。
// initialize
Configuration hdfsConfig = new Configuration();
CompressionCodecFactory codecFactory = new CompressionCodecFactory(hdfsConfig);
CompressionCodec compressionCodec = codecFactory.getCodecByName("default");
SequenceFile.Writer writer = SequenceFile.createWriter(
hdfsConfig,
SequenceFile.Writer.file(path),
SequenceFile.Writer.keyClass(LongWritable.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.compression(SequenceFile.CompressionType.BLOCK;, compressionCodec)
);
// ...
// append
LongWritable key = new LongWritable((new Date).getTime());
Text val = new Text("Some value");
writer.append(key, val);
// ...
// then every 5 minutes...
logger.info("about to sync...");
writer.hsync();
logger.info("synced!");
ログだけを見ると、同期操作は期待どおりに機能しているように見えますが、HDFS 上のファイルは小さいままです。しばらくすると、いくつかのヘッダーといくつかのイベントが追加される可能性がありますが、hsync() の頻度に近いものさえあります。ファイルが閉じられると、すべてが一度にフラッシュされます。
予想される同期のたびに、ファイルの内容を手動でチェックしてデータが存在するかどうかを確認しようとしましたが、ここでもファイルが空に見えます: hdfs dfs -text filename
writer.hsync() が機能しない既知の理由はありますか? もしそうなら、これに対する回避策はありますか?
この問題のさらなるテスト ケース:
import java.util.HashMap;
import java.util.Map;
import java.util.Date;
import java.util.Calendar;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.text.DateFormat;
import java.text.ParseException;
import java.util.Calendar;
import java.util.Date;
import java.util.Locale;
public class WriteTest {
private static final Logger LOG = LoggerFactory.getLogger(WriteTest.class);
public static void main(String[] args) throws Exception {
SequenceFile.CompressionType compressionType = SequenceFile.CompressionType.RECORD;
CompressionCodec compressionCodec;
String compressionCodecStr = "default";
CompressionCodecFactory codecFactory;
Configuration hdfsConfig = new Configuration();
codecFactory = new CompressionCodecFactory(hdfsConfig);
compressionCodec = codecFactory.getCodecByName(compressionCodecStr);
String hdfsURL = "hdfs://10.0.0.1/writetest/";
Date date = new Date();
Path path = new Path(
hdfsURL,
"testfile" + date.getTime()
);
SequenceFile.Writer writer = SequenceFile.createWriter(
hdfsConfig,
SequenceFile.Writer.keyClass(LongWritable.class),
SequenceFile.Writer.valueClass(Text.class),
SequenceFile.Writer.compression(compressionType, compressionCodec),
SequenceFile.Writer.file(path)
);
for(int i=0;i<10000000;i++) {
Text value = new Text("New value!");
LongWritable key = new LongWritable(date.getTime());
writer.append(key, value);
writer.hsync();
Thread.sleep(1000);
}
writer.close();
}
}
その結果、sequencefile ヘッダーの書き込みの開始時に 1 つの fsync があり、その後は fsync がなくなります。ファイルが閉じられると、コンテンツがディスクに書き込まれます。