ですから、これは私が壁にぶつかったような課題です。この時点で、コードを何度もやり直して改良した後は、正直なところコードに何の問題も見られなくなったので、もっと目を向けたいと思っています。
gzip-d呼び出しで正しく解凍できるJavaでマルチスレッドコンプレッサーを作成する必要がありました。GZIPOutputStream呼び出しは使用できません。代わりに、ヘッダーとトレーラーを手動で生成し、デフレーターを使用してデータを圧縮しました。標準入力から読み取り、標準出力に書き込みます。
基本的に、私はスレッドプールを維持するためにとExecutorを使用しました。入力が入ってくるときにそれを読み取り、設定されたサイズのバッファーに書き込みます。バッファがいっぱいになったら、そのデータブロックをスレッドに渡します(タスクをキューに入れます)。各スレッドには独自のデフレーターがあり、そのデータを圧縮するために必要な入力とその他の情報が渡されます。また、各ブロックの最後の32Kbを、次のブロックの辞書として使用します。
ヘッダーとトレーラーが正しいことを確認しました。GZIPOutputStreamを使用してファイルを圧縮し、hexdumpを使用してバイトを取得して、出力と比較できるようにしました。さまざまなサイズのファイルを確認しましたが、ヘッダーとトレーラーは同じであるため、おそらく問題は圧縮データにあります。私が得るエラーは次のとおりです:無効な圧縮データ--crcエラー
比較的小さい入力を渡すと(バッファーがいっぱいになることはないのでスレッドが1つしかないため、キューにはタスクが1つしかない)、出力が正しいことを確認しました。圧縮されたデータに対してgzip-dを呼び出して、まったく同じ入力を取得できます。
言い換えると、問題は、複数のスレッドが稼働しているのに十分なデータがある場合にあります。大きなファイルの出力に16進ダンプを使用し、GZIPOutputStreamの16進ダンプと比較したところ、非常に似ています(まったく同じではありませんが、小さなファイルの場合でも、圧縮データの16進ダンプもわずかに異なりました。その場合、gzip -dは引き続き機能します)。これは、ヘッダーとトレーラーが正しいことを私が知っている方法でもあります。
着信コードダンプ
import java.lang.Runtime;
import java.lang.String;
import java.lang.Integer;
import java.util.Arrays;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.nio.ByteBuffer;
import java.io.*;
import java.util.zip.*;
/*Warning: Do not compress files larger than 2GB please. Since this is just
an assignment and not meant to replace an actual parallel compressor, I cut corners
by casting longs to ints, since it's easier to convert to 4 bytes*/
public class Main {
private static final int BLOCK_SIZE = 128*1024;
private static final int DICT_SIZE = 32*1024;
private static byte[] header = {(byte)0x1f, (byte)0x8b, (byte)0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00};
public static void main(String[] args){
class workerThread implements Callable<byte[]> {
private boolean lastBlock;
private boolean dictAvailable;
private byte[] input;
private byte[] dictionary;
private int lastSize;
private byte[] output = new byte[BLOCK_SIZE];
private int compressedLength;
private ByteArrayOutputStream bOut = new ByteArrayOutputStream();
Deflater compress = new Deflater (Deflater.DEFAULT_COMPRESSION, true);
workerThread(boolean last, byte[] blockIn, byte[] dict, boolean useDictionary, int lastBSize){
this.lastBlock = last;
this.input = blockIn;
this.dictionary = dict;
this.dictAvailable = useDictionary;
this.lastSize = lastBSize;
}
public byte[] call() {
//System.out.println("running thread ");
if (lastBlock) {
// System.out.println("Last block!");
compress.setInput(input,0,lastSize);
if(dictAvailable) {
compress.setDictionary(dictionary);
}
compress.finish();
compressedLength = compress.deflate(output,0,BLOCK_SIZE,Deflater.SYNC_FLUSH);
}
else {
//System.out.println("Not last block!");
compress.setInput(input,0,BLOCK_SIZE);
if(dictAvailable) {
compress.setDictionary(dictionary);
}
compressedLength = compress.deflate(output,0,BLOCK_SIZE,Deflater.SYNC_FLUSH);
}
byte[] finalOut = Arrays.copyOfRange(output,0,compressedLength);
return finalOut;
}
}
getProcessors p = new getProcessors();
boolean useDict = true;
int numProcs = p.getNumProcs();
boolean customProcs = false;
boolean foundProcs = false;
boolean foundDict = false;
/*Checking if arguments are correct*/
........
/*Correct arguments, proceeding*/
BufferedInputStream inBytes = new BufferedInputStream(System.in);
byte[] buff = new byte[BLOCK_SIZE];
byte[] dict = new byte[DICT_SIZE];
int bytesRead = 0;
int offset = 0;
int uncompressedLength = 0;
int lastBlockSize = 0;
boolean isLastBlock = false;
boolean firstBlockDone = false;
/*Using an executor with a fixed thread pool size in order to manage threads
as well as obtain future results to maintain synchronization*/
ExecutorService exec = Executors.newFixedThreadPool(numProcs);
CRC32 checksum = new CRC32();
checksum.reset();
List<Future<byte[]>> results = new ArrayList<Future<byte[]>>();
//byte[] temp;
System.out.write(header,0,header.length);
try{
bytesRead = inBytes.read(buff,0, BLOCK_SIZE);
while (bytesRead != -1) {
uncompressedLength += bytesRead;
checksum.update(buff,offset,bytesRead);
offset += bytesRead;
if (offset == BLOCK_SIZE) {
offset = 0;
if(!firstBlockDone){
firstBlockDone = true;
results.add(exec.submit(new workerThread(isLastBlock,buff,dict,false,lastBlockSize)));
}
else {
results.add(exec.submit(new workerThread(isLastBlock,buff,dict,useDict,lastBlockSize)));
}
if (useDict) {
System.arraycopy(buff, BLOCK_SIZE-DICT_SIZE, dict, 0, DICT_SIZE);
}
}
/*Implementation warning! Because of the way bytes are read in, this program will fail if
the file being zipped is exactly a multiple of 128*1024*/
if((bytesRead=inBytes.read(buff,offset,BLOCK_SIZE-offset)) == -1) {
isLastBlock = true;
lastBlockSize = offset;
results.add(exec.submit(new workerThread(isLastBlock,buff,dict,useDict,lastBlockSize)));
}
}
try {
for(Future<byte[]> result: results) {
//System.out.println("Got result!");
System.out.write(result.get(),0,result.get().length);
//temp = result.get();
}
}
catch (InterruptedException ex) {
ex.printStackTrace();
System.err.println("Interrupted thread!");
}
catch (ExecutionException ex) {
ex.printStackTrace();
System.err.println("Interrupted thread!");
}
finally{
exec.shutdownNow();
}
/*Converting CRC sum and total length to bytes for trailer*/
byte[] trailer = new byte[8];
getTrailer trail = new getTrailer(checksum.getValue(),uncompressedLength);
trail.writeTrailer(trailer,0);
System.out.write(trailer);
}
catch (IOException ioe) {
ioe.printStackTrace();
System.out.println("IO error.");
System.exit(-1);
}
catch (Throwable e) {
System.out.println("Unexpected exception or error.");
System.exit(-1);
}
}
}
ああ、おっと、フォーマットはコードブロックフォーマットによって少し捨てられました。
ご覧のとおり、バフがいっぱいになるまで入力から読み続けます。これはファイルではないため、最初のread呼び出しで配列を埋めるのに十分なバイトが読み取られない可能性があります(何も台無しにしたくないnullがたくさん残っています)。いっぱいになったら、スレッドがタスクを実行するようにエグゼキューターに渡します。Runnableの代わりにCallableを実装して、出力をバイト配列として返すことができ、将来のインターフェイスが必要になるためです。exec.get()メソッドを使用すると、スレッドの同期を維持できます。私はそれを任意のケースでテストしました(確認のために1から100までの数字を印刷し、実際に順番に印刷します)。
このプログラムがBLOCK_SIZEの倍数のファイルで動作しないという欠陥がありますが、それは私が今取り組んでいる問題でもありません。このプログラムは、入力が1つのスレッドしか実行できないほど小さい場合に機能します。
最後のブロックを除く各ブロックについて、SYNC_FLUSHオプションを使用してdeflateを呼び出します。これは、バイト境界で終了できるようにするためです。私が通常圧縮し、finishを呼び出す最後のブロック。
本当に長い投稿でごめんなさい。私は間違いを見つけることができないように見えるので、私は自分の意見以外にもっと多くの意見が必要です。誰かがコンパイルして実行して自分の目で確かめたいと思うほど素晴らしい人のために、ここに私が持っていた他のクラスがあります(プロセスの数を取得してトレーラーを生成するためだけです。これらは両方とも正常に機能します)。
import java.io.*;
public class getTrailer {
private long crc;
private int total;
public getTrailer (long crcVal, int totalIn) {
this.crc = crcVal;
this.total = totalIn;
}
public void writeTrailer(byte[] buf, int offset) throws IOException {
writeInt((int)crc, buf, offset); // CRC-32 of uncompr. data
writeInt(total, buf, offset + 4); // Number of uncompr. bytes
}
/* Writes integer in Intel byte order to a byte array, starting at a
* given offset
*/
public void writeInt(int i, byte[] buf, int offset) throws IOException {
writeShort(i & 0xffff, buf, offset);
writeShort((i >> 16) & 0xffff, buf, offset + 2);
}
/*
* Writes short integer in Intel byte order to a byte array, starting
* at a given offset
*/
public void writeShort(int s, byte[] buf, int offset) throws IOException {
buf[offset] = (byte)(s & 0xff);
buf[offset + 1] = (byte)((s >> 8) & 0xff);
}
}
トレーラー関数は、文字通りJAvaのドキュメントからコピーして貼り付けられています
public class getProcessors {
private Runtime runner = Runtime.getRuntime();
private int nProcs = runner.availableProcessors();
int getNumProcs() {
return nProcs;
}
}
これがどれだけ長いかはわかりますが、他の人の意見が本当に必要です。問題を引き起こしていると思われるものを見つけた場合は、教えてください。私は私のためにプログラムを書くために誰かを必要としません(私はほとんどそこにいると思います)が、私はただ...何も悪いことを見ることができません。