4

生成される「レコード」の数が決定される Thread を所有するジェネレーター クラスがあり、その数のレコードを生成します (別のスレッドによる取得のために BlockingQueue に配置されます)。

生成されるレコードの数を他のスレッドに知らせたいと思います(とりわけ、賢明な進捗報告のため)。

Future はまさに私が求めているインターフェイスを提供してくれるようですが、私は Java が初めてで、それを実装する慣用的な方法がわかりません。

私のバックグラウンドは C++/Win32 なので、通常は win32 の「イベント」を使用します (によって作成されCreateEvent(0, true, false, 0)、シグナルSetEventWaitForSingleObject待機の実装のために)。Javaには があることに気付きましたCountDownLatchが、これはどういうわけか私が求めているものよりも重く感じられ(本当にブール値が必要なときに int を使用するのと似ています)、この目的には直感的ではないようです(とにかく)。

これが、CountDownLatch と Future を使用した私のコードです。ここで実際のコードを少し要約しました (無関係な実装の詳細を削除し、すべてのエラー処理を無視しています)。

    import java.util.concurrent.BlockingQueue;
    import java.util.concurrent.CountDownLatch;
    import java.util.concurrent.Future;
    import java.util.concurrent.LinkedBlockingQueue;
    import java.util.concurrent.TimeUnit;

    public abstract class Generator {

        private CountDownLatch numRecordsSignal = new CountDownLatch(1);

        private int numRecords;

        private BlockingQueue<Record> queue = new LinkedBlockingQueue<Record>();

        public Generator() {
            new Thread(new Runnable() {

                @Override
                public void run() {
                    numRecords = calculateNumRecords();
                    numRecordsSignal.countDown();

                    for (Record r : generateRecords()) {
                        try {
                            queue.put(r);
                        } catch (InterruptedException e) {
                            // [ ... snip ... ]
                        }
                    }                
                }
            }).start();
        }

        public Future<Integer> numRecords() {
            return new Future<Integer>() {
                // Ignore cancel for now (It wouldn't make sense to cancel 
                // just this part of the Generator's work, anyway).
                public boolean cancel(boolean mayInterruptIfRunning) { 
                    return false; 
                }

                public Integer get() throws InterruptedException {
                    numRecordsSignal.await();
                    return numRecords;
                }

                public Integer get(long timeout, TimeUnit unit) 
                        throws InterruptedException {
                    numRecordsSignal.await(timeout, unit);
                    return numRecords;
                }

                public boolean isCancelled() {
                    return false;
                }

                public boolean isDone() {
                    // Since we can't cancel, just check the state of the 
                    // signal
                    return numRecordsSignal.getCount() == 0;
                }
            };
        }

        public Record nextRecord() throws InterruptedException {
            return queue.take();
        }

        /** --- Boring stuff below this line --- */
        public interface Record { }

        protected abstract int calculateNumRecords();

        protected abstract Iterable<Record> generateRecords();                        
    }

今、私の実際の質問のために:

  1. CountDownLatchシングルショットシグナリングよりも優れたメカニズムはありますか?
  2. 呼び出し元が結果を待機またはポーリングできるようにしたいのですが、操作をキャンセルできるようにする必要はありません。Future はこのようなものを公開する正しい方法ですか?
  3. このようなものの中に、特に「非 Java」に見えるものはありますか? 私は完全に間違った道を進んでいますか?

編集:

明確にするために、呼び出し元が次のことができることを期待しています。

    Generator gen = new Generator();
    Integer numRecords = gen.numRecords().get();  // This call might block waiting for the result
    numRecords = gen.numRecords().get(); // This call will never block, as the result is already available.

これは、私が実装しようとしている初期化が遅い値です。「初期化」条件が満たされると、ラッチされます。値が既知になると、値は再評価されません。

4

5 に答える 5

3

サイドコメント

コンストラクターでスレッドを開始しないでください。スレッドの開始時に Generator オブジェクトが完全に作成されていないことが考えられます。たとえば、カウントダウン ラッチが null になる可能性があります。コンストラクターでスレッドを作成できますが、別のメソッドで開始する必要があります。呼び出しコードは次のようになります。

Generator g = new Generator();
g.start();

あなたの質問

あなたは未来を自分で再実装していますが、これは私の意見では必要でも望ましくもありません。クラスを再設計し、Generator実装Callable<Integer>してエグゼキューターを介して実行します。これにより、いくつかのことが提供されます。

  • Generator からスレッド ロジックを削除します。これにより、コール スタックの上位レベルでスレッドをより効率的に管理できます。
  • 整数は呼び出しコードの future を介して返され、JDK に依存して実装を処理します。
  • 最初にキューにデータを入力してから整数を返しても問題ないと思いました
  • future.get()何度でも呼び出すことができます。最初に呼び出されたときにのみブロックされます。
public static void main(String[] args) {
    ExecutorService executor = Executors.newFixedThreadPool(1);
    Future<Integer> future = executor.submit(new GeneratorImpl()); //a concrete implementation of Generator
    int numRecords = 0;
    try {
        numRecords = future.get(); //you can use a get with timeout here
    } catch (ExecutionException e) {
        //an exception happened in Generator#call()
    } catch (InterruptedException e) {
        //handle it
    }

    //don't forget to call executor.shutdown() when you don't need it any longer
}

public abstract class Generator implements Callable<Integer> {

    private BlockingQueue<Record> queue = new LinkedBlockingQueue<Record>();

    @Override
    public Integer call() {
        int numRecords = calculateNumRecords();
        for (Record r : generateRecords()) {
            try {
                queue.put(r);
            } catch (InterruptedException e) {
                // [ ... snip ... ]
            }
        }
        return numRecords;
    }

    public Record nextRecord() throws InterruptedException {
        return queue.take();
    }

    /**
     * --- Boring stuff below this line ---
     */
    public interface Record {
    }

    protected abstract int calculateNumRecords();

    protected abstract Iterable<Record> generateRecords();
}

編集

numRecods をできるだけ早く返す必要がある場合は、別のスレッドでキューにデータを入力できます。

    public Integer call() {
        int numRecords = calculateNumRecords();
        new Thread(new Runnable() {
            @Override
            public void run() {
                for (Record r : generateRecords()) {
                    try {
                        queue.put(r);
                    } catch (InterruptedException e) {
                        // [ ... snip ... ]
                    }
                }
            }
        }).start(); //returns immediately
        return numRecords;
    }
于 2012-08-02T09:50:40.883 に答える
1

Java スレッドの「WaitOnSingleEvent()」および「SetEvent()」に相当する標準 Java は、「wait()」、「notify()」、および「notifyAll()」です。

于 2012-08-02T06:25:41.207 に答える
1

私自身のシグナルメカニズムの実装を検討し、同じことを行っている他の人が残したパンくずリストをたどった後、「BooleanLatch」のコードスニペットを含むAbstractQueuedSynchronizerの javadoc に出会いました。これは私のニーズを完全に満たしています。

class BooleanLatch {
    private static class Sync extends AbstractQueuedSynchronizer {
        boolean isSignalled() { return getState() != 0; }

        protected int tryAcquireShared(int ignore) {
            return isSignalled()? 1 : -1;
        }

        protected boolean tryReleaseShared(int ignore) {
            setState(1);
           return true;
        }
    }

    private final Sync sync = new Sync();
    public boolean isSignalled() { return sync.isSignalled(); }
    public void signal()         { sync.releaseShared(1); }
    public void await() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }
 }

もう少し検索すると、多くのフレームワークに BooleanLatch が含まれていることがわかりました (Apache Qpid はその 1 つです)。一部の実装 (アトラシアンのものなど) は自動リセットされるため、私のニーズには不適切です。

于 2012-08-05T03:30:11.657 に答える
0

このシナリオでのワン ショット シグナリングでは、「シグナル」を記憶するため、セマフォの方が優れています。条件オブジェクト [wait() は条件付き] はシグナルを記憶しません。

 Semaphore numRecordsUpdated = new Semaphore(0);

ジェネレーターで

  numRecordsUpdated.release();

消費者で

  numRecordsUpdated.acquire();
于 2012-08-02T07:39:40.820 に答える
0

問題を正しく理解していれば、標準のオブザーバー通知パターンがここで役立ちます。

于 2012-08-02T06:44:39.653 に答える