2

バッファーを共有するプロデューサーとコンシューマーがあります。スレッドを実行、一時停止、実行を継続、停止できるようにしたい。

私が試したのは、状態を示す Enum フラグを保持することです。新しいアイテムを作成するたびに、if-else で状態を確認します。状態が実行中の場合は実行を続け、待機中の場合はスレッドを待機させます。これは、プロデューサーとコンシューマーが常に機能している (つまり、プロデューサーはアイテムをバッファーに入れることができ、コンシューマーは常にバッファー内のアイテムを見つけることができる) という事実を考慮するとうまく機能します。ただし、バッファがいっぱいまたは空のためにスレッドの 1 つが互いに待機しているという問題が発生すると、プログラムのロジック全体が台無しになり、完全に解決できなくなります。私はこれに4日間取り組んできましたが、まだ希望はありません。誰かがこれを手伝ってくれたら本当にありがたいです。ありがとう!

スレッドの実行、一時停止、実行の継続、および停止には、GUI を使用しています。スレッドをpause()で待機させます...次に、スレッドを再度実行したい場合は、スレッドをwakeup()して通知します。die() と同じです。

編集: これまでのロジックの問題は、ボタンをクリックして続行すると、スレッドの内部状態が「実行中」のはずなのに「待機中」のままになることです。そのため、GUI がブロックされます。

プロデューサーのコードは次のとおりです。

public class GProducer2 implements Runnable {


    private volatile ThreadState state;
    private volatile ThreadState innerState;
    private BlockingQueue<Integer> buffer;

    private static Queue<Integer> stream = new LinkedList<Integer>();
    static {
        for ( int i = 0; i <= 1000; i++ ) {
            stream.add(i);
        }
    }

    public GProducer2( BlockingQueue<Integer> buffer ) {
        this.buffer = buffer;
        state = ThreadState.RUNNING;
//      innerState = ThreadState.RUNNING;
    }

    @Override
    public void run() {
        /*
         * The first while loop is to keep getting items from the stream
         */
        while( state != ThreadState.DYING ) {
            if ( !stream.isEmpty() ) {
                int item = stream.poll();
                /*
                 * The second while loop is to not lose items if the 
                 * thread has to wait, so it process the item when the thread
                 * is running again.
                 */
                while( state != ThreadState.DYING ) {

                    if ( state == ThreadState.RUNNING ) {
                        //Check to see if buffer has free space
                        boolean freeBuffer = false;
                        synchronized (buffer) {
                            freeBuffer = buffer.offer(item);
                        }

                        while ( (!freeBuffer) && (state == ThreadState.RUNNING)) {
                            //if it doesn't, then wait...
                            synchronized (this) {
                                try {
                                    innerState = ThreadState.WAITING;
                                    wait(100);
                                } catch (InterruptedException e) {
                                    //e.printStackTrace();
                                }
                            }
                            //check to see if the buffer has free space now
                            synchronized (buffer) {
                                freeBuffer = buffer.offer(item);
                            }
                        }

                        if ( (freeBuffer) && (state == ThreadState.RUNNING) ) {
                            synchronized (this) {
                                innerState = ThreadState.RUNNING;
                            }
                            //... continue with the stuff if you need
                            //...
                            //System.out.println(item);
                            //..then break
                            break;
                        }

                    }
                    else if ( state == ThreadState.WAITING ) {
                        synchronized (this) {
                            try {
                                innerState = ThreadState.WAITING;
                                wait();
                            } catch (InterruptedException e) {
                                //e.printStackTrace();
                                //innerState = ThreadState.RUNNING;
                            }
                        }
                    }
                }
            }//when the stream is done.
            else if ( state == ThreadState.WAITING ) {
                synchronized (this) {
                    try {
                        innerState = ThreadState.WAITING;
                        wait();
                    } catch (InterruptedException e) {
                        //e.printStackTrace();
                        if ( state == ThreadState.WAITING )
                            innerState = ThreadState.RUNNING;
                        else
                            innerState = ThreadState.DYING;
                    }
                }
            }
        }

        synchronized (this) {
            innerState = ThreadState.DYING;
        }
    }

    public void pause() {
        synchronized (this) {
            state = ThreadState.WAITING;
        }
    }

    public void die() {
        synchronized (this) {
            state = ThreadState.DYING;
        }
    }

    public void wakeup() {
        synchronized (this) {
            state = ThreadState.RUNNING;
        }
    }

    public ThreadState getState() {
        return state;
    }


    public ThreadState getInnerState() {
        return innerState;
    }

    public boolean isSynched() {
        synchronized (this) {
            if ( state == innerState )
                return true;
            else 
                return false;
        }
    }

}

これが私のコンシューマのコードです:

public class GConsumer implements Runnable {

    private volatile ThreadState state;
    private volatile ThreadState innerState;
    private BlockingQueue<Integer> buffer;
    private List<Integer> holder;

    public GConsumer( BlockingQueue<Integer> buffer ) {
        this.buffer = buffer;
        state = ThreadState.RUNNING;
        holder = new LinkedList<Integer>();
    }

    @Override
    public void run() {
        /*
         * The first while loop is to keep getting items from the buffer
         */
        while( state != ThreadState.DYING ) {
            if ( state == ThreadState.RUNNING ) {
                //if the buffer has items then process them
                boolean emptyBuffer = true;
                synchronized (buffer) {
                    emptyBuffer = buffer.isEmpty();
                }

                if ( !emptyBuffer ) {
                    //Start doing your stuff
                    innerState = ThreadState.RUNNING;
                    int item;
                    synchronized (buffer) {
                        item = buffer.poll();
                    }

                    holder.add(item);

                }
                //otherwise the thread waits for the buffer to get items
                else {
                    synchronized (this) {
                        try {
                            innerState = ThreadState.WAITING;
                            wait(100);
                        } catch (InterruptedException e) {
                            //e.printStackTrace();
                        }
                    }
                }

            }
            else if ( state == ThreadState.WAITING ) {
                synchronized (this) {
                    try {
                        innerState = ThreadState.WAITING;
                        wait();
                    } catch (InterruptedException e) {
                        //e.printStackTrace();
                    }
                }
            }
        }

        synchronized (this) {
            innerState = ThreadState.DYING;
        }

    }

    public void pause() {
        synchronized (this) {
            state = ThreadState.WAITING;
        }
    }

    public void die() {
        synchronized (this) {
            state = ThreadState.DYING;
        }
    }

    public void wakeup() {
        synchronized (this) {
            state = ThreadState.RUNNING;
        }
    }

    public ThreadState getState() {
        return state;
    }

    public synchronized List<Integer> getHolder() {
        return holder;
    }

    public ThreadState getInnerState() {
        return innerState;
    }

    public boolean isSynched() {
        synchronized (this) {
            if ( state == innerState )
                return true;
            else 
                return false;
        }
    }
}

私のGUIのコードは次のとおりです。

public class GController implements ActionListener, ItemListener {

    ExecutorService executor = Executors.newCachedThreadPool();
    private final BlockingQueue<Integer> buffer = new LinkedBlockingQueue<Integer>(10);
    private volatile AppState appState = AppState.CLEAN_START;

    private GProducer2 producer;
    private GConsumer consumer;

    //GUI stuff
    static JToggleButton startBtn;
    static JButton stopBtn;
    static JButton showBtn;

    public static void main(String[] args) {
        SwingUtilities.invokeLater(new Runnable() {
            @Override
            public void run() {
                createAndShowGUI();
            }
        });
    }

    public GController() {
//      producer = new GProducer2(buffer);
//      consumer = new GConsumer(buffer);
    }

    private static void createAndShowGUI() {
        GController gController = new GController();
        JFrame frame = new JFrame("GUI Concurrency");
        frame.setPreferredSize( new Dimension(400, 200));
        frame.setLayout( new FlowLayout() );
        frame.setDefaultCloseOperation( JFrame.EXIT_ON_CLOSE );

        startBtn = new JToggleButton("Start");
        startBtn.addItemListener(gController);

        stopBtn = new JButton("Cancel");
        stopBtn.setEnabled(false);
        stopBtn.setActionCommand("Cancel");
        stopBtn.addActionListener(gController);

        showBtn = new JButton("Show");
        showBtn.setActionCommand("Show");
        showBtn.addActionListener(gController);

        frame.getContentPane().add(startBtn);
        frame.getContentPane().add(stopBtn);
        frame.getContentPane().add(showBtn);

        frame.pack();
        frame.setVisible(true);
    }


    @Override
    public void actionPerformed(ActionEvent e) {
        String command = e.getActionCommand();
        System.out.println(command + " is clicked");

        if ( command.equals("Cancel") ) {
            startBtn.setText("Start");
            appState = AppState.CLEAN_START;

            producer.die();
            consumer.die();

            synchronized (producer) {
                producer.notify();
            }
            System.out.println( "P:" + producer.getState()  );
            System.out.println( "P inner:" + producer.getInnerState()  );

            synchronized (consumer) {
                consumer.notify();
            }
            System.out.println( "C:" + consumer.getState()  );
            System.out.println( "C inner:" + consumer.getInnerState()  );

            //Block here until they are both dead;

            consumer.getHolder().clear();
            executor.shutdown();
        }
        else if ( command.equals("Show") ) {
            for ( int i : consumer.getHolder() ) {
                System.out.println("[" + i + "]");
            }
            System.out.println();
        }

    }

    @Override
    public void itemStateChanged(ItemEvent e) {
        if ( e.getStateChange() == ItemEvent.SELECTED ) {

            if ( appState == AppState.CLEAN_START) {
                System.out.println("Start");
                startBtn.setText("Pause");
                appState = AppState.RUNNING;
                executor = Executors.newCachedThreadPool();
                producer = new GProducer2(buffer);
                consumer = new GConsumer(buffer);
                executor.execute( producer );
                executor.execute( consumer );
                executor.shutdown();
                stopBtn.setEnabled(false);
            }
            //Now continue execution
            else if ( appState == AppState.PAUSED ) {
                System.out.println("Continue");
                appState = AppState.RUNNING;
                producer.wakeup();
                synchronized (producer) {
                    producer.notify();
                }

                System.out.println( "P:" + producer.getState()  );
                System.out.println( "P inner:" + producer.getInnerState()  );

                consumer.wakeup();
                synchronized (consumer) {
                    consumer.notify();
                }

                System.out.println( "C:" + consumer.getState()  );
                System.out.println( "C inner:" + consumer.getInnerState()  );

                //block the app here until they are really running
//              while( !producer.isSynched() ) {
//              }

                while( !producer.isSynched() | !consumer.isSynched() ) {
                    System.out.println( "P:" + producer.getState()  );
                    System.out.println( "P inner:" + producer.getInnerState()  );

                    System.out.println( "C:" + consumer.getState()  );
                    System.out.println( "C inner:" + consumer.getInnerState()  );
                }

                startBtn.setText("Pause");
                stopBtn.setEnabled(false);
            }
        }
        else {
            System.out.println("Pause");
            startBtn.setText("Continue");
            appState = AppState.PAUSED;

            System.out.println("Before:");

            System.out.println( "P:" + producer.getState()  );
            System.out.println( "P inner:" + producer.getInnerState()  );
            System.out.println( "C:" + consumer.getState()  );
            System.out.println( "C inner:" + consumer.getInnerState()  );

            producer.pause();
            consumer.pause();

            //Block the app here until they are really waiting
            System.out.println("After:");
            System.out.println( "P:" + producer.getState()  );
            System.out.println( "P inner:" + producer.getInnerState()  );
            System.out.println( "C:" + consumer.getState()  );
            System.out.println( "C inner:" + consumer.getInnerState()  );
            while( !producer.isSynched() | !consumer.isSynched() ) {
            }
            stopBtn.setEnabled(true);
        }
    }

}
4

1 に答える 1

0

ここにはコードが多すぎますが、同期プリミティブが明らかに正しく使用されていないことがわかります (ウェイクアップ後に条件をチェックしない、フィールドを宣言しない、フィールドvolatileを保護するsynchronizedなど)。

同期プリミティブは、適切に使用するのが難しいことで有名です。私があなたなら、現在のアプローチを修正しようとするよりも、より高いレベルの構造を使用するでしょう。

何を達成しようとしているのかはわかりませんが、処理を「実行」、「一時停止」、「停止」したいと考えていると思います。「実行」を実現するには、限定されたキューを持つシンプルで標準的なプロデューサー コンシューマー コンストラクトを使用します (このを参照してください)。「一時停止」を実現するには、タスクが共有を参照し、タスク内で次のようにSemaphore呼び出します。acquireSemaphore

final Semaphore signal = new Semaphore(1);
final ExecutionService service = // create one with a bounded queue
                                 // and a CallerRuns policy

void submit(final Callable<T> work){
     Callable<T> wrapped = new Callable<T>(){
         public T call() throws Exception(){
             signal.acquire();
             return work.call();
         }
     };
     service.submit(wrapped);
}

*: コードは説明用であり、コンパイルを意図したものではありません。しかし、あなたはアイデアを得るでしょう

drainPermits次に、 で/releaseを呼び出すことにより、処理を停止/再開できますsignal。「停止」するには、単に呼び出しshutdownますExecutionService

于 2013-10-05T14:47:06.093 に答える