26

私はたくさん検索しましたが、私の問題の解決策を見つけることができませんでした。

BaseTaskを使用しThreadPoolExecutorてタスクを処理する独自のクラスがあります。タスクの優先順位付けが必要ですが、タスクをオブジェクトにラップするため、 PriorityBlockingQueueIを使用しようとすると取得します。ClassCastExceptionThreadPoolExecutorFutureTask

FutureTaskは実装されていないため、これは明らかに理にかなっていComparableますが、優先度の問題を解決するにはどうすればよいでしょうか。newTaskFor()でオーバーライドできることを読みましたがThreadPoolExecutor、このメソッドがまったく見つからないようです...?

任意の提案をいただければ幸いです!

役立つコード:

私のBaseTaskクラスでは

private static final BlockingQueue<Runnable> sWorkQueue = new PriorityBlockingQueue<Runnable>();

private static final ThreadFactory sThreadFactory = new ThreadFactory() {
    private final AtomicInteger mCount = new AtomicInteger(1);

    public Thread newThread(Runnable r) {
        return new Thread(r, "AsyncTask #" + mCount.getAndIncrement());
    }
};

private static final BaseThreadPoolExecutor sExecutor = new BaseThreadPoolExecutor(
    1, Integer.MAX_VALUE, 10, TimeUnit.SECONDS, sWorkQueue, sThreadFactory);

private final BaseFutureTask<Result> mFuture;

public BaseTask(int priority) {
    mFuture = new BaseFutureTask<Result>(mWorker, priority);
}

public final BaseTask<Params, Progress, Result> execute(Params... params) {

    /* Some unimportant code here */

    sExecutor.execute(mFuture);
}

BaseFutureTaskクラスで

@Override
public int compareTo(BaseFutureTask another) {
    long diff = this.priority - another.priority;

    return Long.signum(diff);
}

BaseThreadPoolExecutorクラスiで3つのメソッドをオーバーライドします...このsubmitクラスのコンストラクターが呼び出されますが、どのsubmitメソッドも呼び出されません

4

7 に答える 7

14
public class ExecutorPriority {

public static void main(String[] args) {

    PriorityBlockingQueue<Runnable> pq = new PriorityBlockingQueue<Runnable>(20, new ComparePriority());

    Executor exe = new ThreadPoolExecutor(1, 2, 10, TimeUnit.SECONDS, pq);
    exe.execute(new RunWithPriority(2) {

        @Override
        public void run() {

            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });
    exe.execute(new RunWithPriority(10) {

        @Override
        public void run() {
            System.out.println(this.getPriority() + " started");
            try {
                Thread.sleep(3000);
            } catch (InterruptedException ex) {
                Logger.getLogger(ExecutorPriority.class.getName()).log(Level.SEVERE, null, ex);
            }
            System.out.println(this.getPriority() + " finished");
        }
    });

}

private static class ComparePriority<T extends RunWithPriority> implements Comparator<T> {

    @Override
    public int compare(T o1, T o2) {
        return o1.getPriority().compareTo(o2.getPriority());
    }
}

}

ご想像のとおり、RunWithPriorityは、Runnableであり、整数の優先度フィールドを持つ抽象クラスです。

于 2011-03-30T11:37:20.330 に答える
11

次のヘルパー クラスを使用できます。

public class PriorityFuture<T> implements RunnableFuture<T> {

    private RunnableFuture<T> src;
    private int priority;

    public PriorityFuture(RunnableFuture<T> other, int priority) {
        this.src = other;
        this.priority = priority;
    }

    public int getPriority() {
        return priority;
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        return src.cancel(mayInterruptIfRunning);
    }

    public boolean isCancelled() {
        return src.isCancelled();
    }

    public boolean isDone() {
        return src.isDone();
    }

    public T get() throws InterruptedException, ExecutionException {
        return src.get();
    }

    public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
        return src.get();
    }

    public void run() {
        src.run();
    }

    public static Comparator<Runnable> COMP = new Comparator<Runnable>() {
        public int compare(Runnable o1, Runnable o2) {
            if (o1 == null && o2 == null)
                return 0;
            else if (o1 == null)
                return -1;
            else if (o2 == null)
                return 1;
            else {
                int p1 = ((PriorityFuture<?>) o1).getPriority();
                int p2 = ((PriorityFuture<?>) o2).getPriority();

                return p1 > p2 ? 1 : (p1 == p2 ? 0 : -1);
            }
        }
    };
}

public interface PriorityCallable<T> extends Callable<T> {

    int getPriority();

}

そして、このヘルパーメソッド:

public static ThreadPoolExecutor getPriorityExecutor(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS,
            new PriorityBlockingQueue<Runnable>(10, PriorityFuture.COMP)) {

        protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
            RunnableFuture<T> newTaskFor = super.newTaskFor(callable);
            return new PriorityFuture<T>(newTaskFor, ((PriorityCallable<T>) callable).getPriority());
        }
    };
}

そして、次のように使用します。

class LenthyJob implements PriorityCallable<Long> {
    private int priority;

    public LenthyJob(int priority) {
        this.priority = priority;
    }

    public Long call() throws Exception {
        System.out.println("Executing: " + priority);
        long num = 1000000;
        for (int i = 0; i < 1000000; i++) {
            num *= Math.random() * 1000;
            num /= Math.random() * 1000;
            if (num == 0)
                num = 1000000;
        }
        return num;
    }

    public int getPriority() {
        return priority;
    }
}

public class TestPQ {

    public static void main(String[] args) throws InterruptedException, ExecutionException {
        ThreadPoolExecutor exec = getPriorityExecutor(2);

        for (int i = 0; i < 20; i++) {
            int priority = (int) (Math.random() * 100);
            System.out.println("Scheduling: " + priority);
            LenthyJob job = new LenthyJob(priority);
            exec.submit(job);
        }
    }
}
于 2013-05-16T01:44:11.627 に答える
5

この問題を完全に機能するコードで説明しようと思います。しかし、コードに飛び込む前に、PriorityBlockingQueue について説明したいと思います。

PriorityBlockingQueue : PriorityBlockingQueue は BlockingQueue の実装です。優先度とともにタスクを受け入れ、優先度が最も高いタスクを最初に実行するために送信します。2 つのタスクの優先度が同じ場合、どちらのタスクが最初に実行されるかを決定するカスタム ロジックを提供する必要があります。

それでは早速コードに入りましょう。

ドライバー クラス: このクラスは、タスクを受け取り、後で実行のために送信するエグゼキューターを作成します。ここでは、優先度が LOW のタスクと優先度が HIGH のタスクを 2 つ作成します。ここでは、エグゼキューターに MAX 1 のスレッドを実行し、PriorityBlockingQueue を使用するように指示します。

     public static void main(String[] args) {

       /*
       Minimum number of threads that must be running : 0
       Maximium number of threads that can be created : 1
       If a thread is idle, then the minimum time to keep it alive : 1000
       Which queue to use : PriorityBlockingQueue
       */
    PriorityBlockingQueue queue = new PriorityBlockingQueue();
    ThreadPoolExecutor executor = new ThreadPoolExecutor(0,1,
        1000, TimeUnit.MILLISECONDS,queue);

    MyTask task = new MyTask(Priority.LOW,"Low");
    executor.execute(new MyFutureTask(task));
    task = new MyTask(Priority.HIGH,"High");
    executor.execute(new MyFutureTask(task));
}

MyTask クラス: MyTask は Runnable を実装し、コンストラクターの引数として優先度を受け入れます。このタスクが実行されると、メッセージが出力され、スレッドが 1 秒間スリープ状態になります。

   public class MyTask implements Runnable {

  public int getPriority() {
    return priority.getValue();
  }

  private Priority priority;

  public String getName() {
    return name;
  }

  private String name;

  public MyTask(Priority priority,String name){
    this.priority = priority;
    this.name = name;
  }

  @Override
  public void run() {
    System.out.println("The following Runnable is getting executed "+getName());
    try {
      Thread.sleep(1000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

}

MyFutureTask クラス: タスクを保持するために PriorityBlocingQueue を使用しているため、タスクは FutureTask 内にラップする必要があり、FutureTask の実装は Comparable インターフェイスを実装する必要があります。Comparable インターフェイスは、2 つの異なるタスクの優先度を比較し、優先度が最も高いタスクを実行のために送信します。

 public class MyFutureTask extends FutureTask<MyFutureTask>
      implements Comparable<MyFutureTask> {

    private  MyTask task = null;

    public  MyFutureTask(MyTask task){
      super(task,null);
      this.task = task;
    }

    @Override
    public int compareTo(MyFutureTask another) {
      return task.getPriority() - another.task.getPriority();
    }
  }

プライオリティ クラス: 自明のプライオリティ クラス。

public enum Priority {

  HIGHEST(0),
  HIGH(1),
  MEDIUM(2),
  LOW(3),
  LOWEST(4);

  int value;

  Priority(int val) {
    this.value = val;
  }

  public int getValue(){
    return value;
  }


}

この例を実行すると、次の出力が得られます

The following Runnable is getting executed High
The following Runnable is getting executed Low

最初に優先度の低いタスクを送信しましたが、優先度の高いタスクは後で送信しましたが、PriorityBlockingQueue を使用しているため、優先度の高いタスクが最初に実行されます。

于 2016-01-24T09:55:13.663 に答える
4

私の解決策:

public class XThreadPoolExecutor extends ThreadPoolExecutor
{
    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }

    public XThreadPoolExecutor(int corePoolSize, int maximumPoolSize,
        long keepAliveTime, TimeUnit unit, PriorityBlockingQueue<Runnable> workQueue,
        ThreadFactory threadFactory, RejectedExecutionHandler handler)
    {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value)
    {
        return new ComparableFutureTask<>(runnable, value);
    }

    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable)
    {
        return new ComparableFutureTask<>(callable);
    }

    protected class ComparableFutureTask<V>
        extends FutureTask<V> implements Comparable<ComparableFutureTask<V>>
    {
        private Object object;
        public ComparableFutureTask(Callable<V> callable)
        {
            super(callable);
            object = callable;
        }

        public ComparableFutureTask(Runnable runnable, V result)
        {
            super(runnable, result);
            object = runnable;
        }

        @Override
        @SuppressWarnings("unchecked")
        public int compareTo(ComparableFutureTask<V> o)
        {
            if (this == o)
            {
                return 0;
            }
            if (o == null)
            {
                return -1; // high priority
            }
            if (object != null && o.object != null)
            {
                if (object.getClass().equals(o.object.getClass()))
                {
                    if (object instanceof Comparable)
                    {
                        return ((Comparable) object).compareTo(o.object);
                    }
                }
            }
            return 0;
        }
    }
}
于 2011-12-12T08:13:44.243 に答える
0

彼らはApacheの調和からそれを残したようです。の不在を修正する約 1 年前のsvn コミット ログがありnewTaskForます。おそらく、拡張されたsubmit関数をオーバーライドして、拡張された拡張をThreadPoolExecutor作成できます。それらはそれほど長くはありませんFutureTaskComparable

于 2010-08-23T17:18:12.420 に答える
0

あなたの質問に答えるには:newTaskFor()メソッドはThreadPoolExecutorのスーパークラス にありAbstractExecutorServiceます。ThreadPoolExecutorただし、で単純にオーバーライドできます。

于 2012-09-30T18:30:13.883 に答える