2

サイズが約 5 GB の大きなファイルを転送するタスクがあるため、読み取りと書き込みの操作になります。

一度に 5k を読み取るシングル スレッド バージョンを作成し、すぐに 5K を別の場所に書き込みます。

私の目標は、実際にマルチスレッド バージョンを作成することです。頭に浮かんだ自然なデザイン パターンは、プロデューサー (リーダー) コンシューマー (ライター) パターンでした。

以下は、マルチスレッド バージョンに基づいた最初のシングル スレッド バージョンです。


   import java.net.*;
import java.io.*;
import java.nio.*;
import java.util.Arrays;

public class ReadWrite {

    URI readFrom = null;
    URI writeTo = null;
    //streams
    FileInputStream fis = null;
    FileOutputStream fos = null;
    // good buffer size in Java is generally between 2k to 8k
    byte[] byteBuffer = new byte[5 * 1024];
    //just for testing 
    private int readSoFar = 0;
    //const
    ReadWrite(URI readFrom, URI writeTo) {
        this.readFrom = readFrom;
        this.writeTo = writeTo;

    }

    public URI getReadFrom() {
        return readFrom;
    }

    public void setReadFrom(URI readFrom) {
        this.readFrom = readFrom;
    }

    public URI getWriteTo() {
        return writeTo;
    }

    public void setWriteTo(URI writeTo) {
        this.writeTo = writeTo;
    }

    public void process() throws FileNotFoundException {

        // by chunks therefore buffer
        File fileToRead = new File(readFrom);
        File fileToWrite = new File(writeTo);
        try {
            // if read & write destinations exist
            if (fileToRead.exists()) {
                fis = new FileInputStream(fileToRead);
                // instantiate OutputStream

                fos = new FileOutputStream(fileToWrite);
                // read a chunk, then write , update read position, until there
                // is no more to read
                try {
                    int writeCounter = 0;
                    // read
                    while ((fis.read(byteBuffer, 0, byteBuffer.length)) != -1) {

                        try {
                            //just for testing & seeing output/progress
                            readSoFar= readSoFar + byteBuffer.length;
                            System.out.println("readSoFar:" + readSoFar);
                            // write
                            fos.write(byteBuffer);
                            // clear previous data
                            Arrays.fill(byteBuffer, (byte) 0);
                            System.out.println("writeCounter: " + writeCounter);
                            writeCounter++;
                        } catch (IOException exc) {
                            exc.printStackTrace();

                        }
                    }
                } catch (IOException exc) {
                    exc.printStackTrace();
                }

            } else {
                throw new FileNotFoundException();
            }
        } finally {
            if (fis != null) {
                try {
                    fis.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            if (fos != null) {
                try {
                    fos.flush();
                    fos.close();

                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
        }
    }// end class ReadWrite

}

メイン クラス FileCopy (シングル スレッド):

public class FileCopy {

    public static void main(String[] args) {

     try {
        try {
            //wls1033_dev.zip
            new ReadWrite( new URI("file:/C:/Users/anaim/Pictures/wls1033_dev.zip"),new URI("file:/C:/Users/anaim/Pictures/result/wls1033_dev.zip")).process();
        } catch (URISyntaxException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    } catch (FileNotFoundException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }

    }

}//end main class

出力は次のようになります。

. . . readSoFar:423198720 writeCounter: 82655 readSoFar:423203840 writeCounter: 82656 readSoFar:423208960 writeCounter: 82657

明らかに、宛先ファイルは元のファイルと同一である必要はありません。


以下はマルチスレッド バージョンです。これはシングル スレッド コードに基づいていますが、同期、待機、通知によるスレッド ロジックとロックが実装されています。マルチスレッドは実際に機能しており、デッドロックや活性の問題はありませんが、読み取り/書き込み操作は終了しません。

問題は、readState == -1 の場合、読み取り/書き込みが終了するよりも、変数 " readState" とその getReadState()メソッドがメイン クラスで呼び出されたときに正しく動作していないように見えることです。

マルチスレッドの読み取りと書き込み (クラス ReadProducerWriteConsumer) は、「boolean empty」変数/フラグの値に基づいて、同期されたセクションで待機する readProcess と writeProcess を提供します。

import java.net.*;
import java.io.*;
import java.nio.*;
import java.util.Arrays;

public class ReadProducerWriteConsumer {

    URI readFrom = null;
    URI writeTo = null;
    //
    FileInputStream fis = null;
    FileOutputStream fos = null;
    // good buffer size in Java is generally between 2k to 8k
    byte[] byteBuffer = new byte[1024];
    //
    File fileToRead = null;
    File fileToWrite = null;
    //
    private int readSoFar = 0;
    int writeCounter = 0;
    //
    volatile private int readState = 0;
    // Consumer & Producer state , hence has anything been read in order to be
    // written
    boolean empty = true;

    ReadProducerWriteConsumer(URI readFrom, URI writeTo) {
        this.readFrom = readFrom;
        this.writeTo = writeTo;
        //
        fileToRead = new File(readFrom);
        fileToWrite = new File(writeTo);
    }

    public long getReadState() {
        return this.readState;
    }

    public void setReadState(int readState) {
        this.readState = readState;
    }

    public URI getReadFrom() {
        return readFrom;
    }

    public void setReadFrom(URI readFrom) {
        this.readFrom = readFrom;
    }

    public URI getWriteTo() {
        return writeTo;
    }

    public void setWriteTo(URI writeTo) {
        this.writeTo = writeTo;
    }

    public synchronized void readProcess() throws FileNotFoundException {

        // while false, while data is being written , wait
        while (empty == false) {
            try {
                wait();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }

        // by chunks therefore buffer
        File fileToRead = new File(readFrom);

        try {
            // if read & write destinations exist
            if (fileToRead.exists()) {
                fis = new FileInputStream(fileToRead);
                // read a chunk
                try {
                    // while readSoFar!=-1
                    while (((this.readState = fis.read(byteBuffer, 0,
                            byteBuffer.length)) != -1) && empty != false) {

                        // just for testing & seeing output/progress
                        readSoFar = readSoFar + byteBuffer.length;
                        System.out.println("readSoFar:" + readSoFar);

                        // read a chunck , now that buffer is full set emoty to
                        // false
                        empty = false;

                    }
                } catch (IOException exc) {
                    exc.printStackTrace();
                }

            } else {
                throw new FileNotFoundException();
            }
        } finally {
            if (fis != null) {
                try {
                    fis.close();
                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            // new data has been read , notify all threads waiting to consume
            // data
            notifyAll();
        }
    }

    public synchronized void writeProcess() throws FileNotFoundException {
        // while true, therefore there is nothing to write, wait
        while (empty == true) {
            try {
                wait();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        // by chunks therefore buffer
        File fileToWrite = new File(writeTo);

        try {
            // instantiate OutputStream
            fos = new FileOutputStream(fileToWrite);
            // then write , update read position

            // write
            try {
                fos.write(byteBuffer);
                // clear previous data
                Arrays.fill(byteBuffer, (byte) 0);
                System.out.println("writeCounter: " + writeCounter);
                writeCounter++;
            } catch (IOException exc) {
                exc.printStackTrace();

            }

        } finally {

            if (fos != null) {
                try {
                    fos.flush();
                    fos.close();

                } catch (IOException e) {
                    // TODO Auto-generated catch block
                    e.printStackTrace();
                }
            }
            // new data has been written , notify all threads waiting to
            // read/produce more data
            empty = true;
            notifyAll();
        }
    }

}//end class ReadProducerWriteConsumer

readRunnable クラスはランナブルを実装し、ReadProducerWriteConsumer.readProcess() を呼び出します

import java.io.FileNotFoundException;

public class readRunnable implements Runnable {
    ReadProducerWriteConsumer ReaderProducerWriterConsumer = null;

    public readRunnable(ReadProducerWriteConsumer readerProducerWriterConsumer) {
        super();
        ReaderProducerWriterConsumer = readerProducerWriterConsumer;
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            ReaderProducerWriterConsumer.readProcess();
        } catch (FileNotFoundException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
    // call ReaderProducerWriterConsumer read method
}//end readRunnable class

writeRunnable クラスはランナブルを実装し、ReadProducerWriteConsumer.writeProcess() を呼び出します

import java.io.FileNotFoundException;


public class writeRunnable implements Runnable
    {
        ReadProducerWriteConsumer ReaderProducerWriterConsumer = null; 

        public writeRunnable (ReadProducerWriteConsumer readerProducerWriterConsumer) {
            super();
            ReaderProducerWriterConsumer = readerProducerWriterConsumer;
        }

        @Override
        public void run() {
            // TODO Auto-generated method stub
            try {
                ReaderProducerWriterConsumer.writeProcess();
            } catch (FileNotFoundException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        //ReaderProducerWriterConsumer write method
    }//end writeRunnable

スレッドをインスタンス化し、読み取りがなくなるまで読み取り/書き込みスレッドを作成するメイン クラス。この状態は ReadProducerWriteConsumer.getReadState() でチェックされます。

import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.URL;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.util.Arrays;

public class FileCopy {

    public static void main(String[] args) {

        ReadProducerWriteConsumer ReaderProducerWriterConsumer = null;
        try {
            ReaderProducerWriterConsumer = new ReadProducerWriteConsumer(
                    new URI("file:/C:/Users/anaim/Pictures/pic.png"), new URI(
                            "file:/C:/Users/anaim/Pictures/result/pic.png"));
        } catch (URISyntaxException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        Thread readThread = null;
        Thread writeThread = null;

        while (ReaderProducerWriterConsumer.getReadState() != -1) {

            readThread = new Thread(new readRunnable(
                    ReaderProducerWriterConsumer));

            writeThread = new Thread(new writeRunnable(
                    ReaderProducerWriterConsumer));

            readThread.start();
            writeThread.start();

            try {
                readThread.join();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            try {
                writeThread.join();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }



    }

}// end main class

したがって、マルチスレッド バージョンの問題は、読み取り/書き込み操作が停止しないため、書き込み操作によって破損したファイルが生成されることです。

賢い解決策を教えてください。ありがとう。

4

1 に答える 1

0

エラーはループしない書き込みプロセスにあります。一度だけ書き込みを行ってから終了します。その後、読み取りプロセスはバッファを補充し、再び空になるまで永遠に待ちます。

それに加えて、あなたのデザインは面白いですが、少し変わっています。

それにはいくつかの問題があります:

  • FileInputStream.read 関数は、要求した正確なバイト数を返すとは限りません。戻り値は、読み取られたバイト数を示します。その値を書き込みプロセスに使用する必要があります。

  • ループを使用してemptyブール値をロックとしてチェックしています。セマフォを使用する方が簡単です ( java.util.concurrentを参照)。

  • または、BlockingQueue を使用して読み取りと書き込みを同期することもできます。

  • 循環バッファーと呼ばれるデータ構造もあり、読み取りデータをバッファー内に「連続的に」保存できますが、Java に存在するかどうかはわかりません。基本的に、バッファ内に 2 つのポインタを保持します。1 つはバッファ内の残りの空き領域の先頭を指し、もう 1 つは使用済み領域の先頭を指します。バッファに書き込むと、空き領域の位置からデータの格納を開始します。バッファーの最後に到達すると、最初に戻ります (したがって、「循環」の考え方)。使用済みスペース ポインターを渡さないように注意する必要がありますが、読み取りを正しく行っている場合 (つまり、残っている空きスペース以上を要求しない場合) は、この問題を回避できるはずです。バッファから読み取るときは、使用済みスペース ポインタから開始し、バッファで利用可能なバイト数を読み取ります。おそらく、後の段階でそのアプローチを試す必要があります。

于 2012-10-28T20:41:40.550 に答える