1

それで、私は非常に柔軟なこの Phaser を手に入れましたが、何かが足りないようです。
私は CyclicBarrier の使用に成功しましたが、今はもっと柔軟なものが必要です。コードは次のとおりです


private static final CountDownLatch synchronizer = new CountDownLatch(1);
private static AtomicBoolean HAS_TIMED_OUT = new AtomicBoolean(false);


コード:

try {
    logger.INFO("CONNECTED - Peer ID properties: " + SYS_NEWLINE + peerSocket + SYS_NEWLINE + pID, true);

    final int peerKQueries = sp.getInteger(peerSocket);
    peerObjects = new String[peerKQueries];
    peerValues = new BigDecimal[peerKQueries];
    for ( int i = 0; i < peerObjects.length; i++ )
       peerObjects[i] = sp.getString(peerSocket);
    for ( int i = 0; i < peerValues.length; i++ )
       peerValues[i] = sp.getBigDecimal(peerSocket);

    final int phase1a = htPhaser1a.arrive();
    if ( phase1a < 0 ) {
        logger.ERROR("Rejecting Super Peer thread " + THREAD_ID + " because it arrived lately for Phase 1a!", true);
        sp.close(peerSocket);
        throw new IllegalThreadStateException();
    } else {
        logger.INFO(pID + " -> Arrived in HT phase 1a. Total arrivals: "+htPhaser1a.getArrivedParties(), true);
        logger.INFO("Super Peer thread " + THREAD_ID + " will advance to HT Phase 1b/2 (phase number is "+phase1a+").", true);
        // The last peer should also unblock the barrier.
        if ( htPhaser1a.getArrivedParties() == TOTAL_PEERS.get() ) {
          htPhaser1a.arrive();
          synchronizer.countDown();
        }
            htPhaser1a.awaitAdvanceInterruptibly(phase1a, 30, TimeUnit.SECONDS);
    }

} catch (IOException e) {
    logger.ERROR("Super Peer thread " + THREAD_ID + " encountered an I/O error.", true);
    sp.close(peerSocket);
    throw new IllegalThreadStateException();
} catch (TimeoutException e) {
    logger.INFO("Super Peer thread " + THREAD_ID + " timed out but will advance to HT Phase 1b/2.", true);
    if ( HAS_TIMED_OUT.compareAndSet(false, true) ) {
        logger.INFO("Parties NOT arrived in the timeout: "+(htPhaser1a.getUnarrivedParties()-1), true);
        resetCriticalData(htPhaser1a.getArrivedParties());
        htPhaser1a.forceTermination();
        instantiateHTPhase1b();
        instantiateHTPhase2();
        instantiateHTPatch();
        synchronizer.countDown();
    }
} finally {
    logger.INFO("Super Peer thread "+THREAD_ID+" is blocked!", true);
    synchronizer.await();
    logger.INFO("Super Peer thread's "+THREAD_ID+" blocking waived!", true);
}

sp.getSomething();I/O 呼び出しです。
このコード サンプルが複数のスレッドによって実行されていることを考慮してください。

ここに私の問題があります: MAX_CLIENTS 以上が Phaser に到着しないことを確認したので、MAX_CLIENTS が到着した場合はすべて問題ありません。ただし、TimeoutException で問題が発生しました。1 つ目は、クライアント (スレッド A など) がフェーズに到達できるタイム ウィンドウ (競合状態) であり、次にスレッド B で TimeoutException が発生します。スレッド B で別のフェイザーを到着したパーティの数で動的にインスタンス化しています。 (たとえば 5)、スレッド A は既にフェーズに到達しています (別名、phase1a は < 0 であることがわかりませんでした)。どうすればそれを修正できますか? 私はセマフォを使用することを考えていましたが、おそらくこれを行う方法を再考する必要があるため、努力する価値はないと思います. タイマーを使用してインクリメントすることも考えましたAtomicInteger変数とタイマーが切れると、Phaser が動的にインスタンス化されます。この問題にどのようにアプローチするかについてのアイデアはありますか?

編集:
ドキュメントにはbulkRegister(int parties)メソッドがありますが、ちょっと変わった言い回しです:

指定された数の新しい未到着のパーティをこのフェイザーに追加します。onAdvance(int, int) の継続的な呼び出しが進行中の場合、このメソッドその完了を待ってから戻る場合があります。このフェーザーに親があり、指定されたパーティーの数が 0 よりも大きく、このフェーザーに以前にパーティーが登録されていなかった場合、この子フェーザーもその親に登録されます。このフェーザーが終了した場合、登録しようとしても効果がなく、負の値が返されます。

質問:「かもしれません」という言葉は私を混乱させます! "may" as may または "may"

編集:
解決しました。以下の私の答えを確認してください。

4

1 に答える 1

1

宣言:

private static final CountDownLatch PEER = new CountDownLatch(1);
private static AtomicBoolean HAS_TIMED_OUT = new AtomicBoolean(false);
htPeerPhaser = new Phaser();


コード:

...
htPeerPhaser.register(); // Called only once.
...
// Note: Server application has guaranteed that no more than the maximum number of peers will arrive.
try {
    logger.INFO("CONNECTED - Peer ID properties: " + SYS_NEWLINE + peerSocket + SYS_NEWLINE + pID, true);
    final int peerKQueries = sp.getInteger(peerSocket);
    peerObjects = new String[peerKQueries];
    peerValues = new BigDecimal[peerKQueries];
    for ( int i = 0; i < peerObjects.length; i++ )
        peerObjects[i] = sp.getString(peerSocket);
    for ( int i = 0; i < peerValues.length; i++ )
        peerValues[i] = sp.getBigDecimal(peerSocket);
    final int registrationID = htPeerPhaser.bulkRegister(1);
    if ( registrationID < 0 ) {
        logger.ERROR("Rejecting Super Peer thread " + THREAD_ID + " because peer registration has stopped!", true);
        sp.close(peerSocket);
        throw new IllegalThreadStateException();
    }
    logger.INFO(pID + " -> Registered for HT phase 1.", true);
    logger.INFO("Super Peer thread " + THREAD_ID + " will advance to HT Phase 1/2.", true);
    // The last peer should also unblock the barrier.
    if ( htPeerPhaser.getRegisteredParties() == TOTAL_PEERS.get()+1 ) {
        htPeerPhaser.forceTermination();
        PEER.countDown();
    }
    htPeerPhaser.awaitAdvanceInterruptibly(registrationID, 30, TimeUnit.SECONDS);

} catch (IOException e) {
    logger.ERROR("Super Peer thread " + THREAD_ID + " encountered an I/O error.", true);
    sp.close(peerSocket);
    throw new IllegalThreadStateException();
} catch (TimeoutException e) {
    htPeerPhaser.forceTermination();
    logger.INFO("Super Peer thread " + THREAD_ID + " timed out but will advance to HT Phase 1b/2.", true);
    if ( HAS_TIMED_OUT.compareAndSet(false, true) && htPeerPhaser.getRegisteredParties() < TOTAL_PEERS.get()+1 ) {
        final int arrivedPeers = htPeerPhaser.getRegisteredParties()-1;
        logger.INFO("Parties that arrived before timeout: "+arrivedPeers, true);
        final int unarrivedPeers = TOTAL_PEERS.get()-arrivedPeers;
        logger.INFO("Parties NOT arrived due to timeout: "+unarrivedPeers, true);
        resetCriticalData(arrivedPeers);
        instantiateHTPhase1b();
        instantiateHTPhase2();
        instantiateHTPatch();
        PEER.countDown();
        logger.INFO("Super Peer thread " + THREAD_ID + " re-instantiated critical data.", true);
    }
}
logger.INFO("Super Peer thread "+THREAD_ID+" is blocked!", true);
PEER.await();
logger.INFO("Super Peer thread's "+THREAD_ID+" blocking waived!", true);
于 2011-10-05T12:22:22.873 に答える