バッファーを共有するプロデューサーとコンシューマーがあります。スレッドを実行、一時停止、実行を継続、停止できるようにしたい。
私が試したのは、状態を示す Enum フラグを保持することです。新しいアイテムを作成するたびに、if-else で状態を確認します。状態が実行中の場合は実行を続け、待機中の場合はスレッドを待機させます。これは、プロデューサーとコンシューマーが常に機能している (つまり、プロデューサーはアイテムをバッファーに入れることができ、コンシューマーは常にバッファー内のアイテムを見つけることができる) という事実を考慮するとうまく機能します。ただし、バッファがいっぱいまたは空のためにスレッドの 1 つが互いに待機しているという問題が発生すると、プログラムのロジック全体が台無しになり、完全に解決できなくなります。私はこれに4日間取り組んできましたが、まだ希望はありません。誰かがこれを手伝ってくれたら本当にありがたいです。ありがとう!
スレッドの実行、一時停止、実行の継続、および停止には、GUI を使用しています。スレッドをpause()で待機させます...次に、スレッドを再度実行したい場合は、スレッドをwakeup()して通知します。die() と同じです。
編集: これまでのロジックの問題は、ボタンをクリックして続行すると、スレッドの内部状態が「実行中」のはずなのに「待機中」のままになることです。そのため、GUI がブロックされます。
プロデューサーのコードは次のとおりです。
public class GProducer2 implements Runnable {
private volatile ThreadState state;
private volatile ThreadState innerState;
private BlockingQueue<Integer> buffer;
private static Queue<Integer> stream = new LinkedList<Integer>();
static {
for ( int i = 0; i <= 1000; i++ ) {
stream.add(i);
}
}
public GProducer2( BlockingQueue<Integer> buffer ) {
this.buffer = buffer;
state = ThreadState.RUNNING;
// innerState = ThreadState.RUNNING;
}
@Override
public void run() {
/*
* The first while loop is to keep getting items from the stream
*/
while( state != ThreadState.DYING ) {
if ( !stream.isEmpty() ) {
int item = stream.poll();
/*
* The second while loop is to not lose items if the
* thread has to wait, so it process the item when the thread
* is running again.
*/
while( state != ThreadState.DYING ) {
if ( state == ThreadState.RUNNING ) {
//Check to see if buffer has free space
boolean freeBuffer = false;
synchronized (buffer) {
freeBuffer = buffer.offer(item);
}
while ( (!freeBuffer) && (state == ThreadState.RUNNING)) {
//if it doesn't, then wait...
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait(100);
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
//check to see if the buffer has free space now
synchronized (buffer) {
freeBuffer = buffer.offer(item);
}
}
if ( (freeBuffer) && (state == ThreadState.RUNNING) ) {
synchronized (this) {
innerState = ThreadState.RUNNING;
}
//... continue with the stuff if you need
//...
//System.out.println(item);
//..then break
break;
}
}
else if ( state == ThreadState.WAITING ) {
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait();
} catch (InterruptedException e) {
//e.printStackTrace();
//innerState = ThreadState.RUNNING;
}
}
}
}
}//when the stream is done.
else if ( state == ThreadState.WAITING ) {
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait();
} catch (InterruptedException e) {
//e.printStackTrace();
if ( state == ThreadState.WAITING )
innerState = ThreadState.RUNNING;
else
innerState = ThreadState.DYING;
}
}
}
}
synchronized (this) {
innerState = ThreadState.DYING;
}
}
public void pause() {
synchronized (this) {
state = ThreadState.WAITING;
}
}
public void die() {
synchronized (this) {
state = ThreadState.DYING;
}
}
public void wakeup() {
synchronized (this) {
state = ThreadState.RUNNING;
}
}
public ThreadState getState() {
return state;
}
public ThreadState getInnerState() {
return innerState;
}
public boolean isSynched() {
synchronized (this) {
if ( state == innerState )
return true;
else
return false;
}
}
}
これが私のコンシューマのコードです:
public class GConsumer implements Runnable {
private volatile ThreadState state;
private volatile ThreadState innerState;
private BlockingQueue<Integer> buffer;
private List<Integer> holder;
public GConsumer( BlockingQueue<Integer> buffer ) {
this.buffer = buffer;
state = ThreadState.RUNNING;
holder = new LinkedList<Integer>();
}
@Override
public void run() {
/*
* The first while loop is to keep getting items from the buffer
*/
while( state != ThreadState.DYING ) {
if ( state == ThreadState.RUNNING ) {
//if the buffer has items then process them
boolean emptyBuffer = true;
synchronized (buffer) {
emptyBuffer = buffer.isEmpty();
}
if ( !emptyBuffer ) {
//Start doing your stuff
innerState = ThreadState.RUNNING;
int item;
synchronized (buffer) {
item = buffer.poll();
}
holder.add(item);
}
//otherwise the thread waits for the buffer to get items
else {
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait(100);
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
}
}
else if ( state == ThreadState.WAITING ) {
synchronized (this) {
try {
innerState = ThreadState.WAITING;
wait();
} catch (InterruptedException e) {
//e.printStackTrace();
}
}
}
}
synchronized (this) {
innerState = ThreadState.DYING;
}
}
public void pause() {
synchronized (this) {
state = ThreadState.WAITING;
}
}
public void die() {
synchronized (this) {
state = ThreadState.DYING;
}
}
public void wakeup() {
synchronized (this) {
state = ThreadState.RUNNING;
}
}
public ThreadState getState() {
return state;
}
public synchronized List<Integer> getHolder() {
return holder;
}
public ThreadState getInnerState() {
return innerState;
}
public boolean isSynched() {
synchronized (this) {
if ( state == innerState )
return true;
else
return false;
}
}
}
私のGUIのコードは次のとおりです。
public class GController implements ActionListener, ItemListener {
ExecutorService executor = Executors.newCachedThreadPool();
private final BlockingQueue<Integer> buffer = new LinkedBlockingQueue<Integer>(10);
private volatile AppState appState = AppState.CLEAN_START;
private GProducer2 producer;
private GConsumer consumer;
//GUI stuff
static JToggleButton startBtn;
static JButton stopBtn;
static JButton showBtn;
public static void main(String[] args) {
SwingUtilities.invokeLater(new Runnable() {
@Override
public void run() {
createAndShowGUI();
}
});
}
public GController() {
// producer = new GProducer2(buffer);
// consumer = new GConsumer(buffer);
}
private static void createAndShowGUI() {
GController gController = new GController();
JFrame frame = new JFrame("GUI Concurrency");
frame.setPreferredSize( new Dimension(400, 200));
frame.setLayout( new FlowLayout() );
frame.setDefaultCloseOperation( JFrame.EXIT_ON_CLOSE );
startBtn = new JToggleButton("Start");
startBtn.addItemListener(gController);
stopBtn = new JButton("Cancel");
stopBtn.setEnabled(false);
stopBtn.setActionCommand("Cancel");
stopBtn.addActionListener(gController);
showBtn = new JButton("Show");
showBtn.setActionCommand("Show");
showBtn.addActionListener(gController);
frame.getContentPane().add(startBtn);
frame.getContentPane().add(stopBtn);
frame.getContentPane().add(showBtn);
frame.pack();
frame.setVisible(true);
}
@Override
public void actionPerformed(ActionEvent e) {
String command = e.getActionCommand();
System.out.println(command + " is clicked");
if ( command.equals("Cancel") ) {
startBtn.setText("Start");
appState = AppState.CLEAN_START;
producer.die();
consumer.die();
synchronized (producer) {
producer.notify();
}
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
synchronized (consumer) {
consumer.notify();
}
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
//Block here until they are both dead;
consumer.getHolder().clear();
executor.shutdown();
}
else if ( command.equals("Show") ) {
for ( int i : consumer.getHolder() ) {
System.out.println("[" + i + "]");
}
System.out.println();
}
}
@Override
public void itemStateChanged(ItemEvent e) {
if ( e.getStateChange() == ItemEvent.SELECTED ) {
if ( appState == AppState.CLEAN_START) {
System.out.println("Start");
startBtn.setText("Pause");
appState = AppState.RUNNING;
executor = Executors.newCachedThreadPool();
producer = new GProducer2(buffer);
consumer = new GConsumer(buffer);
executor.execute( producer );
executor.execute( consumer );
executor.shutdown();
stopBtn.setEnabled(false);
}
//Now continue execution
else if ( appState == AppState.PAUSED ) {
System.out.println("Continue");
appState = AppState.RUNNING;
producer.wakeup();
synchronized (producer) {
producer.notify();
}
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
consumer.wakeup();
synchronized (consumer) {
consumer.notify();
}
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
//block the app here until they are really running
// while( !producer.isSynched() ) {
// }
while( !producer.isSynched() | !consumer.isSynched() ) {
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
}
startBtn.setText("Pause");
stopBtn.setEnabled(false);
}
}
else {
System.out.println("Pause");
startBtn.setText("Continue");
appState = AppState.PAUSED;
System.out.println("Before:");
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
producer.pause();
consumer.pause();
//Block the app here until they are really waiting
System.out.println("After:");
System.out.println( "P:" + producer.getState() );
System.out.println( "P inner:" + producer.getInnerState() );
System.out.println( "C:" + consumer.getState() );
System.out.println( "C inner:" + consumer.getInnerState() );
while( !producer.isSynched() | !consumer.isSynched() ) {
}
stopBtn.setEnabled(true);
}
}
}