0

共有 LinkedBlocking Queue を使用している Java でアプリケーションを開発しており、それを読み書きするために複数のスレッドを作成しています。以下のようにコードを作成しましたが、目的の結果を得ることができません。

結果として、両方のスレッド (読み取りと書き込み) によって書き込まれている共有ファイルを使用しています。

私のコードで何が間違っているか教えてください:

メッセージリーダー.java

package com.aohandling.messagereader;

import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.aohandling.messagequeue.MessageQueue;

public class MessageReader implements Runnable
{
    public static BufferedWriter out;

    public static void init()
    {
    file = new File("AOHandle.txt");
    try
    {
        out = new BufferedWriter(new FileWriter(file, true));
        System.out.println("Init ");
    }
    catch (IOException e)
    {
        e.printStackTrace();
    }
    }

    static File file = null;

    public void run()
    {
    while (true)
    {
        try
        {
        SimpleDateFormat ft = new SimpleDateFormat("E yyyy.MM.dd 'at' hh:mm:ss a zzz");
        String s = MessageQueue.getMessageQueue().poll();
        if (s != null)
        {
            out.write("queue - " + MessageQueue.getMessageQueue().poll() + "---"  + ft.format(new Date()) + "\n");
        }

        }
        catch (IOException e)
        {
        e.printStackTrace();
        }
    }
    }
}

MessageWriter.java

package com.aohandling.writer;

import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;

import com.aohandling.messagequeue.MessageQueue;
import com.aohandling.messagereader.MessageReader;

public class MessageWriter implements Runnable
{

    int n;
    private int messageSequence;

    public MessageWriter(int messageSequence)
    {
    this.messageSequence = messageSequence;
    }

    public void run()
    {

    try
    {
        SimpleDateFormat ft = new SimpleDateFormat("E yyyy.MM.dd 'at' hh:mm:ss a zzz");
        MessageReader.out.append("Writing----AO - " + this.messageSequence + "-----" + ft.format(new Date()) + "\n");
        MessageQueue.getMessageQueue().put("AO " + this.messageSequence);
    }
    catch (IOException | InterruptedException e)
    {
        e.printStackTrace();
    }
    }
}

MessageQueue.java

package com.aohandling.messagequeue;

import java.util.concurrent.LinkedBlockingQueue;

public class MessageQueue {

    private static LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>();

    public static LinkedBlockingQueue<String> getMessageQueue() {
        return MessageQueue.messageQueue;
    }

    public static void setMessageQueue(LinkedBlockingQueue<String> messageQueue) {
        MessageQueue.messageQueue = messageQueue;
    }
}

TestAOHandlingRead.java

package com.aohandling.main;

import com.aohandling.messagereader.MessageReader;
import com.aohandling.writer.MessageWriter;

public class TestAOHandlingRead
{

    /**
     * @param args
     */
    public static void main(String[] args)
    {
    MessageReader.init();
    for (int i = 0; i <= 200; i++)
    {
        Thread readThread = new Thread(new MessageReader());
        readThread.start();
    }
    write();

    }
    public static void write()
    {
    for (int i = 0; i <= 20; i++)
    {
        if (i % 2 == 0)
        {
        try
        {
            Thread.sleep(500);
        }
        catch (InterruptedException e)
        {
            e.printStackTrace();
        }
        }

        Thread writeThread = new Thread(new MessageWriter(i));
        writeThread.start();

    }
    }
}

TestAOHandlingWrite.java

package com.aohandling.main;

import java.util.concurrent.atomic.AtomicInteger;

import com.aohandling.writer.MessageWriter;

public class TestAOHandlingWrite {

    int count = 0;

    public int getCount()
    {
        return count;
    }

    /**
     * @param args
     */
    public static void main(String[] args) {

//      MessageWriter.init();
        for (int i=0; i<= 20; i++) {
         if (i%2 ==0) {
             try {
                Thread.sleep(500);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
         }

         Thread writeThread = new Thread(new MessageWriter(i));
            writeThread.start();

        }


    }

}
4

1 に答える 1

0

FileChannel は複数の同時スレッドで安全に使用できるため、FileChannel を使用することをお勧めします。さらに、コードをリファクタリングしました。クラス ローダーによる MessageReader クラスの最初のロード時に、ファイルへのチャネルが 1 回作成されます。

public class MessageReader implements Runnable {
    private static FileChannel channel;

    static {
    try {
        System.out.println("Init ");
        FileOutputStream fileOutputStream = new FileOutputStream(
                "AOHandle.txt", true);

        FileChannel channel = fileOutputStream.getChannel();
        System.out.println("Init ");

    } catch (IOException e) {
        e.printStackTrace();
    }
    }

        public void run() {
    while (true) {
        FileLock fileLock = null;
        try {
            SimpleDateFormat ft = new SimpleDateFormat(
                    "E yyyy.MM.dd 'at' hh:mm:ss a zzz");
            String s = MessageQueue.getMessageQueue().poll();
            if (s != null) {
                String message = "queue - "
                        + MessageQueue.getMessageQueue().poll() + "---"
                        + ft.format(new Date()) + "\n";
                fileLock = channel.lock();
                channel.write(ByteBuffer.wrap(message.getBytes()));
            }

        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            try {
                if (fileLock != null) {
                    fileLock.release();
                }
            } catch (IOException e) {

                e.printStackTrace();
            }
        }
    }
}
}

ベスト プラクティスは、ファイルへのチャネルを 1 か所で開き、スレッド間でチャネルを共有することです。これは、コードでは誰もファイルを閉じないためです。

于 2013-10-17T07:19:23.603 に答える