2

私はそれを間違っていると思います。共有キューから一部のデータをクランチすると思われるスレッドを作成しています。私の問題は、プログラムが遅く、メモリを大量に消費することです。キューが期待どおりに共有されていない可能性があると思われます。コードにキューのサイズを表示する行を追加し、2 つのスレッドを起動すると、まったく異なる数値の 2 つの出力が得られ、それぞれが増加しているように見えるため、これが疑われます (同じ数値である可能性があると思いましたが、 100 から 2 などにジャンプしていたのかもしれませんが、見た後では 105 と 5 が表示され、異なる速度で進んでいます.4 つのスレッドがある場合は、4 つの異なる数値が表示されます)。

関連部分の抜粋です。プログラムの先頭にあるキューに必要なデータを含む静的クラスを作成します

static class queue_class {
        int number;
        int[] data;
        Context(int number,  int[] data) {
            this.number = number;
            this.data = data;
        }
    }

次に、いくつかのジョブを callable に送信した後、キューを作成します。

static class process_threaded implements Callable<Void> {
    // queue with contexts to process
    private Queue<queue_class> queue;

    process_threaded(queue_class request) {
        queue = new ArrayDeque<queue_class>();
        queue.add(request);
    }

    public Void call() {
        while(!queue.isEmpty()) {
            System.out.println("in contexts queue with a size of " + queue.size());
            Context current = contexts.poll();
            //get work and process it, if it work great then the solution goes elsewhere
            //otherwise, depending on the data, its either discarded or parts of it is added back to queue
            queue.add(new queue_class(k, data_list)); 

ご覧のとおり、データには 3 つのオプションがあります。データが良好な場合は送信され、データが完全にひどい場合は破棄されるか、キューに戻されます。送り返されたときにキューが動いていると思いますが、各スレッドが共有キューではなく独自のキューで動作しているためと思われます。

この推測は正しいですか、私はこれを間違っていますか?

4

2 に答える 2

2

のコンストラクターでキューを作成しているので、各スレッドが(おそらく)独自のキューで動作しているという評価は正しいですCallable。(実際には、-を持っているのは非常に奇妙Callable<Void>です-それは単なる?ではありませんRunnableか?)

そこには他にも問題があります。たとえば、スレッドセーフではないキューを操作しているという事実や、コードが記述されたとおりにコンパイルされないという事実です。

ただし、重要な質問は、そもそもキューを明示的に作成する必要があるのか​​ということです。ExecutorServicesを送信する対象を持っていないのはなぜですかCallable(またはRunnablesその切り替えを行うことにした場合):エグゼキュータへの参照をsに渡すと、実行するタスクのエグゼキュータのキューにCallable新しいsを追加できます。Callable車輪の再発明をする必要はありません。

例えば:

static class process_threaded implements Runnable {
    // Reference to an executor
    private final ExecutorService exec;
    // Reference to the job counter
    private final AtomicInteger jobCounter;
    // Request to process
    private queue_class request;

    process_threaded( ExecutorService exec, AtomicInteger counter, queue_class request) {
        this.exec = exec;
        this.jobCounter = counter;
        this.jobCounter.incrementAndGet(); // Assuming that you will always
                                           // submit the process_threaded to
                                           // the executor if you create it.
        this.request = request;
    }

    public run() {
        //get work and process **request**, if it work great then the solution goes elsewhere
        //otherwise, depending on the data, its either discarded or parts of are added back to the executor
        exec.submit( new process_threaded( exec, new queue_class(k, data_list) ) );

        // Can do some more work

        // Always run before returning: counter update and notify the launcher
        synchronized(jobCounter){
            jobCounter.decrementAndGet();
            jobCounter.notifyAll();
        }
    }
}

編集:

エグゼキュータをいつシャットダウンするかという問題を解決するには、ジョブカウンタを用意し、それが0に達したときにシャットダウンするのが最も簡単な解決策だと思います。スレッドセーフのためにAtomicIntegerは、おそらくanが最良の選択です。変更を組み込むために、上記のコードをいくつか追加しました。次に、起動コードは次のようになります。

void theLauncher() {

    AtomicInteger jobCounter = new AtomicInteger( 0 );

    ExecutorService exec = Executors.newFixedThreadPool( Runtime.getRuntime().availableProcesses());

    exec.submit( new process_threaded( exec, jobCounter, someProcessRequest ) );
    // Can submit some other things here of course...

    // Wait for jobs to complete:
    for(;;jobCounter.get() > 0){
        synchronized( jobCounter ){ // (I'm not sure if you have to have the synchronized block, but I think this is safer.
            if( jobCounter.get() > 0 )
                jobCounter.wait();
        }
    }

    // Now you can shutdown:
    exec.shutdown();
}
于 2012-04-14T04:31:06.607 に答える
2

車輪を再発明しないでください!ConcurrentLinkedQueueを使用するのはどうですか? javadoc から:

リンクされたノードに基づく無制限のスレッド セーフ キュー。このキューは要素を FIFO (先入れ先出し) で順序付けます。キューの先頭は、最も長い時間キューにあった要素です。キューの末尾は、最も短い時間キューにあった要素です。新しい要素はキューの末尾に挿入され、キューの取得操作はキューの先頭にある要素を取得します。ConcurrentLinkedQueue は、多くのスレッドが共通のコレクションへのアクセスを共有する場合に適切な選択です。

于 2012-04-14T04:45:29.133 に答える