0

異なるスレッドからの新しいデータを待機するJavaのクラスを実現したいと思います。彼がそれを取得すると、このクラスはそれを処理し、再び新しいデータを待機します。同期、待機、notifyAllコマンドのみを使用してこれを実現したいと思います。私はいくつかの変種を試しました:

1)コマンドlockObject.wait()によって待機する1つのスレッドを使用します。ただし、すべてのアクティブなスレッドが作業を終了すると、このスレッドは永久に待機します。もちろん、メソッドstopProcess()を作成することはできますが、別のプログラマーが呼び出すのを忘れてしまう可能性があるため、安全ではありません。

2)1つのデーモンスレッドを使用すると、すべてのアクティブなスレッドが作業を終了すると私のデーモンスレッドが停止するため、機能しませんが、処理する必要のあるデータを取得できます。

3)新しいデータが来るとき-データを処理する新しいスレッドを作成します。スレッドが生きている間(彼は与えられたデータを処理します)、彼は新しいデータを受け取ります。データが来ておらず、すべての古いデータが処理されたら、スレッドは動作を終了します。このバリアントのマイナスは、データが一定期間を通過するとき(スレッドが古いデータを処理して終了する時間があるとき)に、新しいスレッドが作成されることです。パフォーマンスやメモリに悪いと思います。私は正しいですか?

stopProcess()メソッドを使用せずに、1つまたは2つのスレッド(デーモンとアクティブスレッドを組み合わせて使用​​している可能性があります)を使用して問題を解決することは可能ですか?

ここにいくつかのコード

キューをブロックすることに気づきました

public class BlockingQueue<T> {
    private Queue<T> queue = new LinkedList<T>();

    public void add(T el){
        synchronized (queue){
            queue.add(el);
        }
    }

    public T getFirst(){
        synchronized (queue){
            return queue.poll();
        }
    }

    public int getSize(){
        synchronized (queue){
            return queue.size();
        }
    }
}

データクラス

public class Data {
    //some data

    public void process(){
        //process this data
    }
} 

コードの最初のバリアント

public class ProcessData {

    private BlockingQueue<Data> queue = new BlockingQueue<Data>();
    private boolean run = false;
    private Thread processThread;
    private Object lock = new Object();

    public synchronized void addData(Data data) throws Exception {
        if (run){
            if (data != null){
                queue.add(data);
                wakeUpToProcess();
            }
        }else{
            throw new Exception("");
        }
    }

    public synchronized void start() {
        if (!run){
            run = true;

            processThread = new Thread(new Runnable() {
                public void run() {

                    while (run || queue.getSize()!=0){

                        while(queue.getSize() == 0 && run){
                            //if stopProcess was not called
                            //and no active threads
                            //it will not die
                            waitForNewData();
                        }

                        Data cur;
                        while(queue.getSize() > 0){
                            cur = queue.getFirst();
                            cur.process();
                        }

                    }
                }
            });
            processThread.start();

        }
    }

    public synchronized void stopProcess() {
        if (run){
            run = false;
            wakeUpToProcess();
        }
    }

    private void waitForNewData(){
        try{
            synchronized (lock){
                lock.wait();
            }
        }catch (InterruptedException ex){
            ex.printStackTrace();
        }
    }

    private void wakeUpToProcess(){
        synchronized (lock){
            lock.notifyAll();
        }
    }
}

2番目のバリアントでは、processThreadをデーモンとして作成します。しかし、アクティブなスレッドが停止すると、processThreadは機能しますが、キューにいくつかのデータがあり、処理する必要があります。

3番目のバリアント

public class ProcessData {

    private BlockingQueue<Data> queue = new BlockingQueue<Data>();
    private boolean run = false;
    private Thread processThread = null;

    public synchronized void addData(Data data) throws Exception {
        if (run){
            if (data != null){
                queue.add(data);
                wakeExecutor();
            }
        }else{
            throw new Exception("ProcessData is stopped!");
        }
    }

    public synchronized void start() {
        if (!run){
            run = true;
        }
    }

    public synchronized void stopProcess() {
        if (run){
            run = false;
        }
    }

    public boolean isRunning(){
        return this.run;
    }

    protected void wakeExecutor(){
        if (processThread ==null || !processThread.isAlive()){
            processThread = new Thread(new Runnable() {
                @Override
                public void run() {
                    Data cur;
                    while(queue.getSize() > 0){
                        cur = queue.getFirst();
                        cur.process();
                    }
                }
            });
            processThread.start();
        }
    }
}

重要なのは、データがスレッドからの順序で処理される必要があるということです。

4

2 に答える 2

1

ここで車輪を真剣に再発明しています。java.util.concurrent必要なものはすべて、パッケージ内の JDK で利用できます。

BlockingQueueを介してプロデューサーとコンシューマーのパターンを実装し、プロデューサーの呼び出しとコンシューマー スレッドの呼び出しを使用して、何かが利用可能になるまでブロックします。offer()take()

それでおしまい。あなたが書いたすべてのクラスは必要ありませんし、書くべきではありません。これらの並行クラスは、すべてのロックと同期を行い、それも正しく行います (これは過小評価されるべきではありません)。

于 2012-11-04T11:24:53.027 に答える
0

それ以降の使用が許可されていない場合はjava.util.concurrent、 などに基づいて独自のタスク キューを実装する必要がありますLinkedList。ブロッキング動作をキューにカプセル化します。たとえば、(疑似コード)

synchronized Data nextTask() {
  while(the linked list is empty) {
    wait()
  }
  remove and return head of the queue
}

synchronized void addTask(Data d) {
  add d to the queue
  notifyAll()
}

次に、このようなことを継続的に行う消費者スレッドを作成できます

while(true) {
  taskQueue.nextTask().process()
}

プロデューサースレッドはtaskQueue.addTask、各タスクをキューに追加するために呼び出します。最後に正常なシャットダウンが必要な場合は、消費者スレッドに終了するように指示する「センチネル値」が必要になるかThread.interrupt()、適切なタイミングで呼び出す方法を見つける必要があります。

于 2012-11-04T21:06:40.900 に答える