私は ArrayBlocking キューを持っており、その上で単一のスレッド固定レート Scheduled が動作します。タスクに失敗した可能性があります。それを再実行するか、高優先度または最上位でキューに再挿入したい
2 に答える
ここでのいくつかの考え-
なぜPriorityBlockingQueueではなくArrayBlockingQueueを使用しているのですか?あなたが私に必要なもののように聞こえます。最初に、すべての要素が同じ優先度になるように設定します。
例外を受け取った場合-より高い優先度でキューに再挿入します
最も簡単なのは優先キューかもしれません。タスクにリトライ番号を付けます。ゼロから始まります。実行に失敗した後、すべての 1 を破棄し、0 をインクリメントして、高い優先度でキューに戻します。この方法を使用すると、後で必要に応じて、すべてを 3 回、またはそれ以上実行することを簡単に決定できます。欠点は、タスク クラスを変更する必要があることです。
もう 1 つのアイデアは、別のノンブロッキング、スレッドセーフ、優先順位の高いキューを設定することです。新しいタスクを探すときは、まずノンブロッキング キューをチェックし、そこにあるものを実行します。それ以外の場合は、ブロッキング キューに移動します。これはそのままで機能する可能性があり、これまでのところ最も簡単なソリューションです。問題は、ブロッキング キューでスケジューラがブロックされている間に、優先度の高いキューがいっぱいになる可能性があることです。
これを回避するには、独自のブロッキングを行う必要があります。どちらのキューもノンブロッキングである必要があります。(提案: java.util.concurrent.ConcurrentLinkedQueuewait()
。)モニターで両方のキューをポーリングした後、結果はありません。何かが何かをキューに入れると、それが呼び出さnotifyAll()
れ、スケジューラが再び起動できるようになります。スケジューラが両方のキューをチェックした後、 を呼び出す前に通知が発生しないように細心の注意が必要ですwait()
。
添加:
手動ブロックによる 3 番目のソリューションのプロトタイプ コード。いくつかのスレッド化が提案されていますが、読者は自分の状況を最もよく知っているでしょう. ロックの待機をブロックしがちなコード、大規模な作業を行っている間にスレッド (およびコア) を数分間拘束しがちなコード、および他のコードがすべて完了するのを待つ余裕がないコードはどれですか。考慮。たとえば、失敗した実行を、時間のかかるクリーンアップなしで同じスレッドですぐに再実行できる場合、このコードのほとんどをジャンクできます。
private final ConcurrentLinkedQueue mainQueue = new ConcurrentLinkedQueue();
private final ConcurrentLinkedQueue prioQueue = new ConcurrentLinkedQueue();
private final Object entryWatch = new Object();
/** Adds a new job to the queue. */
public void addjob( Runnable runjob ) {
synchronized (entryWatch) { entryWatch.notifyAll(); }
}
/** The endless loop that does the work. */
public void schedule() {
for (;;) {
Runnable run = getOne(); // Avoids lock if successful.
if (run == null) {
// Both queues are empty.
synchronized (entryWatch) {
// Need to check again. Someone might have added and notifiedAll
// since last check. From this point until, wait, we can be sure
// entryWatch is not notified.
run = getOne();
if (run == null) {
// Both queues are REALLY empty.
try { entryWatch.wait(); }
catch (InterruptedException ie) {}
}
}
}
runit( run );
}
}
/** Helper method for the endless loop. */
private Runnable getOne() {
Runnable run = (Runnable) prioQueue.poll();
if (run != null) return run;
return (Runnable) mainQueue.poll();
}
/** Runs a new job. */
public void runit( final Runnable runjob ) {
// Do everthing in another thread. (Optional)
new Thread() {
@Override public void run() {
// Run run. (Possibly in own thread?)
// (Perhaps best in thread from a thread pool.)
runjob.run();
// Handle failure (runit only, NOT in runitLast).
// Defining "failure" left as exercise for reader.
if (failure) {
// Put code here to handle failure.
// Put back in queue.
prioQueue.add( runjob );
synchronized (entryWatch) { entryWatch.notifyAll(); }
}
}
}.start();
}
/** Reruns a job. */
public void runitLast( final Runnable runjob ) {
// Same code as "runit", but don't put "runjob" in "prioQueue" on failure.
}