0

再利用可能なコードを作成して、タスクをエグゼキューター サービスに送信する際に待機状態を許可したいと考えています。あまりにも多くのタスクがキューに入っている場合にブロックするためのきちんとした方法の実装がたくさんあります

タスクが終了するたびに、待機中のすべてのスレッドを評価するエグゼキューターが必要です。タスクを atm に送信できるかどうかを判断するには、アクティブなすべてのタスクの現在の状態を考慮する必要があります。私は次の解決策を思いつきました。これは、複数の送信者や高度な同時実行タスクに合わせて拡張する必要はありません。

質問:次のコードは安全に使用できますか? それとも、見落としている欠陥がありますか? aquireAccessのメソッドを実装する人はConditionEvaluator<T>、クエリされたスレッドの状態がスレッド セーフであることを確認する必要がありますが、実装者は activeTasks コレクションに対する反復を保護する必要はありません。コードは次のとおりです。

public class BlockingExecutor<T extends Runnable> {

    private final Executor executor;

    private final ConditionEvaluator<T> evaluator;

    final ReentrantLock lock = new ReentrantLock();

    final Condition condition = this.lock.newCondition();

    final LinkedList<T> active = new LinkedList<T>();

    private final long reevaluateTime;

    private final TimeUnit reevaluateTimeUnit;

    public BlockingExecutor(Executor executor, ConditionEvaluator<T> evaluator) {
        this.evaluator = evaluator;
        this.executor = executor;
        this.reevaluateTimeUnit = null;
        this.reevaluateTime = 0;
    }

    public BlockingExecutor(Executor executor, ConditionEvaluator<T> evaluator, long reevaluateTime, TimeUnit reevaluateTimeUnit) {
        this.evaluator = evaluator;
        this.executor = executor;
        this.reevaluateTime = reevaluateTime;
        this.reevaluateTimeUnit = reevaluateTimeUnit;
    }

    public void submitTask(final T task) throws InterruptedException {
        this.lock.lock();
        try {
            do{
            if (this.reevaluateTimeUnit == null) {
                this.condition.await(this.reevaluateTime, this.reevaluateTimeUnit);
            } else {
                this.condition.await();
            }
            }while(!this.evaluator.aquireAccess(this.active, task));
                this.active.add(task);
                this.executor.execute(new Runnable() {

                    @Override
                    public void run() {
                        try {
                            task.run();
                        } finally {
                            BlockingExecutor.this.lock.lock();
                            try{
                                BlockingExecutor.this.active.remove(task);
                                BlockingExecutor.this.condition.signalAll();
                            }finally{
                                BlockingExecutor.this.lock.unlock();
                            }

                        }
                    }
                });
        } finally {
            this.lock.unlock();
        }
    }
}

public interface ConditionEvaluator<T extends Runnable> {
    public boolean aquireAccess(Collection<T> activeList,T task);   
}

質問:コードを改善できますか?

4

0 に答える 0