共有 LinkedBlocking Queue を使用している Java でアプリケーションを開発しており、それを読み書きするために複数のスレッドを作成しています。以下のようにコードを作成しましたが、目的の結果を得ることができません。
結果として、両方のスレッド (読み取りと書き込み) によって書き込まれている共有ファイルを使用しています。
私のコードで何が間違っているか教えてください:
メッセージリーダー.java
package com.aohandling.messagereader;
import java.io.BufferedWriter;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import com.aohandling.messagequeue.MessageQueue;
public class MessageReader implements Runnable
{
public static BufferedWriter out;
public static void init()
{
file = new File("AOHandle.txt");
try
{
out = new BufferedWriter(new FileWriter(file, true));
System.out.println("Init ");
}
catch (IOException e)
{
e.printStackTrace();
}
}
static File file = null;
public void run()
{
while (true)
{
try
{
SimpleDateFormat ft = new SimpleDateFormat("E yyyy.MM.dd 'at' hh:mm:ss a zzz");
String s = MessageQueue.getMessageQueue().poll();
if (s != null)
{
out.write("queue - " + MessageQueue.getMessageQueue().poll() + "---" + ft.format(new Date()) + "\n");
}
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}
MessageWriter.java
package com.aohandling.writer;
import java.io.IOException;
import java.text.SimpleDateFormat;
import java.util.Date;
import com.aohandling.messagequeue.MessageQueue;
import com.aohandling.messagereader.MessageReader;
public class MessageWriter implements Runnable
{
int n;
private int messageSequence;
public MessageWriter(int messageSequence)
{
this.messageSequence = messageSequence;
}
public void run()
{
try
{
SimpleDateFormat ft = new SimpleDateFormat("E yyyy.MM.dd 'at' hh:mm:ss a zzz");
MessageReader.out.append("Writing----AO - " + this.messageSequence + "-----" + ft.format(new Date()) + "\n");
MessageQueue.getMessageQueue().put("AO " + this.messageSequence);
}
catch (IOException | InterruptedException e)
{
e.printStackTrace();
}
}
}
MessageQueue.java
package com.aohandling.messagequeue;
import java.util.concurrent.LinkedBlockingQueue;
public class MessageQueue {
private static LinkedBlockingQueue<String> messageQueue = new LinkedBlockingQueue<String>();
public static LinkedBlockingQueue<String> getMessageQueue() {
return MessageQueue.messageQueue;
}
public static void setMessageQueue(LinkedBlockingQueue<String> messageQueue) {
MessageQueue.messageQueue = messageQueue;
}
}
TestAOHandlingRead.java
package com.aohandling.main;
import com.aohandling.messagereader.MessageReader;
import com.aohandling.writer.MessageWriter;
public class TestAOHandlingRead
{
/**
* @param args
*/
public static void main(String[] args)
{
MessageReader.init();
for (int i = 0; i <= 200; i++)
{
Thread readThread = new Thread(new MessageReader());
readThread.start();
}
write();
}
public static void write()
{
for (int i = 0; i <= 20; i++)
{
if (i % 2 == 0)
{
try
{
Thread.sleep(500);
}
catch (InterruptedException e)
{
e.printStackTrace();
}
}
Thread writeThread = new Thread(new MessageWriter(i));
writeThread.start();
}
}
}
TestAOHandlingWrite.java
package com.aohandling.main;
import java.util.concurrent.atomic.AtomicInteger;
import com.aohandling.writer.MessageWriter;
public class TestAOHandlingWrite {
int count = 0;
public int getCount()
{
return count;
}
/**
* @param args
*/
public static void main(String[] args) {
// MessageWriter.init();
for (int i=0; i<= 20; i++) {
if (i%2 ==0) {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
Thread writeThread = new Thread(new MessageWriter(i));
writeThread.start();
}
}
}