0

私は ArrayBlocking キューを持っており、その上で単一のスレッド固定レート Scheduled が動作します。タスクに失敗した可能性があります。それを再実行するか、高優先度または最上位でキューに再挿入したい

4

2 に答える 2

1

ここでのいくつかの考え-
なぜPriorityBlockingQueueではなくArrayBlockingQueueを使用しているのですか?あなたが私に必要なもののように聞こえます。最初に、すべての要素が同じ優先度になるように設定します。
例外を受け取った場合-より高い優先度でキューに再挿入します

于 2012-06-22T05:20:18.147 に答える
0

最も簡単なのは優先キューかもしれません。タスクにリトライ番号を付けます。ゼロから始まります。実行に失敗した後、すべての 1 を破棄し、0 をインクリメントして、高い優先度でキューに戻します。この方法を使用すると、後で必要に応じて、すべてを 3 回、またはそれ以上実行することを簡単に決定できます。欠点は、タスク クラスを変更する必要があることです。

もう 1 つのアイデアは、別のノンブロッキング、スレッドセーフ、優先順位の高いキューを設定することです。新しいタスクを探すときは、まずノンブロッキング キューをチェックし、そこにあるものを実行します。それ以外の場合は、ブロッキング キューに移動します。これはそのままで機能する可能性があり、これまでのところ最も簡単なソリューションです。問題は、ブロッキング キューでスケジューラがブロックされている間に、優先度の高いキューがいっぱいになる可能性があることです。

これを回避するには、独自のブロッキングを行う必要があります。どちらのキューもノンブロッキングである必要があります。(提案: java.util.concurrent.ConcurrentLinkedQueuewait() 。)モニターで両方のキューをポーリングした後、結果はありません。何かが何かをキューに入れると、それが呼び出さnotifyAll()れ、スケジューラが再び起動できるようになります。スケジューラが両方のキューをチェックした後、 を呼び出す前に通知が発生しないように細心の注意が必要ですwait()

添加:

手動ブロックによる 3 番目のソリューションのプロトタイプ コード。いくつかのスレッド化が提案されていますが、読者は自分の状況を最もよく知っているでしょう. ロックの待機をブロックしがちなコード、大規模な作業を行っている間にスレッド (およびコア) を数分間拘束しがちなコード、および他のコードがすべて完了するのを待つ余裕がないコードはどれですか。考慮。たとえば、失敗した実行を、時間のかかるクリーンアップなしで同じスレッドですぐに再実行できる場合、このコードのほとんどをジャンクできます。

private final ConcurrentLinkedQueue  mainQueue = new ConcurrentLinkedQueue();
private final ConcurrentLinkedQueue  prioQueue = new ConcurrentLinkedQueue();
private final Object  entryWatch = new Object();

/** Adds a new job to the queue. */
public void addjob( Runnable runjob )  {
    synchronized (entryWatch)  { entryWatch.notifyAll(); }
}
/** The endless loop that does the work. */
public void schedule()  {
    for (;;)  {
        Runnable  run = getOne();  // Avoids lock if successful.
        if (run == null)  {
            // Both queues are empty.
            synchronized (entryWatch)  {
                // Need to check again.  Someone might have added and notifiedAll
                // since last check. From this point until, wait, we can be sure
                // entryWatch is not notified.
                run = getOne();
                if (run == null)  {
                    // Both queues are REALLY empty.
                    try  { entryWatch.wait(); }
                    catch (InterruptedException ie)  {}
                }
            }
        }
        runit( run );
    }
}
/** Helper method for the endless loop. */
private Runnable getOne()  {
    Runnable  run = (Runnable) prioQueue.poll();
    if (run != null)  return run;
    return (Runnable) mainQueue.poll();
}
/** Runs a new job. */
public void runit( final Runnable runjob )  {
    // Do everthing in another thread.  (Optional)
    new Thread()  {
        @Override public void run()  {
            // Run run.  (Possibly in own thread?)
            // (Perhaps best in thread from a thread pool.)
            runjob.run();
            // Handle failure (runit only, NOT in runitLast).
            //  Defining "failure" left as exercise for reader.
            if (failure)  {
                // Put code here to handle failure.
                // Put back in queue.
                prioQueue.add( runjob );
                synchronized (entryWatch)  { entryWatch.notifyAll(); }
            }
        }
    }.start();
}
/** Reruns a job. */
public void runitLast( final Runnable runjob )  {
     // Same code as "runit", but don't put "runjob" in "prioQueue" on failure.
}
于 2012-06-22T16:00:08.007 に答える