1

同期メソッドと非同期メソッドを含むライブラリを作成する必要があります。

  • executeSynchronous()- 結果が出るまで待ち、結果を返します。
  • executeAsynchronous()- 必要に応じて、他の処理が完了した後に処理できる Future をすぐに返します。

ライブラリのコア ロジック

顧客は私たちのライブラリを使用し、DataKeyビルダー オブジェクトを渡すことによってそれを呼び出します。次に、そのオブジェクトを使用して URL を作成し、それを実行してその URL への HTTP クライアント呼び出しを行い、JSON 文字列として応答を取得した後、オブジェクトDataKeyを作成してその JSON 文字列をそのまま顧客に返します。 DataResponse. 一部の顧客から電話がexecuteSynchronous()あり、一部の顧客から電話がかかる可能性がexecuteAsynchronous()あるため、ライブラリで 2 つのメソッドを別々に提供する必要があります。

インターフェース:

public interface Client {

    // for synchronous
    public DataResponse executeSynchronous(DataKey key);

    // for asynchronous
    public Future<DataResponse> executeAsynchronous(DataKey key);
}

そして、上記のインターフェースDataClientを実装するmy があります。Client

public class DataClient implements Client {

    private RestTemplate restTemplate = new RestTemplate();
    private ExecutorService executor = Executors.newFixedThreadPool(10);

    // for synchronous call
    @Override
    public DataResponse executeSynchronous(DataKey key) {
        DataResponse dataResponse = null;
        Future<DataResponse> future = null;

        try {
            future = executeAsynchronous(key);
            dataResponse = future.get(key.getTimeout(), TimeUnit.MILLISECONDS);
        } catch (TimeoutException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.TIMEOUT_ON_CLIENT, key);
            dataResponse = new DataResponse(null, DataErrorEnum.TIMEOUT_ON_CLIENT, DataStatusEnum.ERROR);
            // does this look right the way I am doing it?
            future.cancel(true); // terminating tasks that have timed out.
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    //for asynchronous call
    @Override
    public Future<DataResponse> executeAsynchronous(DataKey key) {
        Future<DataResponse> future = null;

        try {
            Task task = new Task(key, restTemplate);
            future = executor.submit(task); 
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
        }

        return future;
    }
}

実際のタスクを実行する単純なクラス:

public class Task implements Callable<DataResponse> {

    private DataKey key;
    private RestTemplate restTemplate;

    public Task(DataKey key, RestTemplate restTemplate) {
        this.key = key;
        this.restTemplate = restTemplate;
    }

    @Override
    public DataResponse call() {
        DataResponse dataResponse = null;
        String response = null;

        try {
            String url = createURL();
            response = restTemplate.getForObject(url, String.class);

            // it is a successful response
            dataResponse = new DataResponse(response, DataErrorEnum.NONE, DataStatusEnum.SUCCESS);
        } catch (RestClientException ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.SERVER_DOWN, key);
            dataResponse = new DataResponse(null, DataErrorEnum.SERVER_DOWN, DataStatusEnum.ERROR);
        } catch (Exception ex) {
            PotoLogging.logErrors(ex, DataErrorEnum.CLIENT_ERROR, key);
            dataResponse = new DataResponse(null, DataErrorEnum.CLIENT_ERROR, DataStatusEnum.ERROR);
        }

        return dataResponse;
    }

    // create a URL by using key object
    private String createURL() {
        String url = somecode;
        return url;
    }
}

問題文:-

このソリューションに取り組み始めたとき、タイムアウトになったタスクを終了していませんでした。タイムアウトをクライアントに報告していましたが、タスクは引き続きスレッド プールで実行されます (制限された 10 個のスレッドの 1 つが長時間占有される可能性があります)。そこで、オンラインでいくつかの調査を行ったところ、以下に示すようcancelに onを使用してタイムアウトになったタスクをキャンセルできることがわかりました-future

future.cancel(true);

しかし、確認したかったexecuteSynchronousのですが、タイムアウトになったタスクをキャンセルするためにメソッドで行っている方法が正しいように見えますか?

タスクがまだキューにある場合に実行を停止する を呼び出しcancel()ているFutureので、自分がしていることが正しいかどうかわかりませんか? これを行うための正しいアプローチは何ですか?

より良い方法があれば、誰かがその例を提供できますか?

4

1 に答える 1

2

タスクがまだキューにある場合は、呼び出すだけでキャンセルfuture.cancel()できますが、それがキューにあるかどうかはわかりません。また、タスクを中断するように依頼futureしても、タスクがスレッドの中断ステータスを無視して何かを実行している可能性があるため、機能しない場合があります。

したがって、を使用できますがfuture.cancel(true)、タスク (スレッド) がスレッドの割り込みステータスを考慮していることを確認する必要があります。たとえば、http 呼び出しを行うと述べたように、スレッドが中断されるとすぐに http クライアント リソースを閉じる必要がある場合があります。

以下の例を参照してください。

タスクキャンセルのシナリオを実装しようとしました。通常、スレッドは自身をチェックisInterrupted()して終了を試みることができます。ただし、呼び出し可能なスレッド プール エグゼキューターを使用している場合や、タスクがあまり似ていない場合、これはより複雑になりますwhile(!Thread.isInterrupted()) {// execute task}

この例では、タスクがファイルを書き込んでいます (単純にするために http 呼び出しは使用しませんでした)。スレッド プール エグゼキュータがタスクの実行を開始しますが、呼び出し元は 100 ミリ秒後にタスクをキャンセルしようとしています。現在、future は割り込みシグナルをスレッドに送信しますが、呼び出し可能なタスクはファイルへの書き込み中にすぐにそれをチェックできません。したがって、これを実現するために、callable は使用する IO リソースのリストを維持し、future がタスクをキャンセルしたいcancel()場合は、IOException でタスクを終了し、スレッドが終了するすべての IO リソースで呼び出すだけです。

public class CancellableTaskTest {

    public static void main(String[] args) throws Exception {
        CancellableThreadPoolExecutor threadPoolExecutor = new CancellableThreadPoolExecutor(0, 10, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>());
        long startTime = System.currentTimeMillis();
        Future<String> future = threadPoolExecutor.submit(new CancellableTask());
        while (System.currentTimeMillis() - startTime < 100) {
            Thread.sleep(10);
        }
        System.out.println("Trying to cancel task");
        future.cancel(true);
    }
}

class CancellableThreadPoolExecutor extends ThreadPoolExecutor {

    public CancellableThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
        return new CancellableFutureTask<T>(callable);
    }
}

class CancellableFutureTask<V> extends FutureTask<V> {

    private WeakReference<CancellableTask> weakReference;

    public CancellableFutureTask(Callable<V> callable) {
        super(callable);
        if (callable instanceof CancellableTask) {
            this.weakReference = new WeakReference<CancellableTask>((CancellableTask) callable);
        }
    }

    public boolean cancel(boolean mayInterruptIfRunning) {
        boolean result = super.cancel(mayInterruptIfRunning);
        if (weakReference != null) {
            CancellableTask task = weakReference.get();
            if (task != null) {
                try {
                    task.cancel();
                } catch (Exception e) {
                    e.printStackTrace();
                    result = false;
                }
            }
        }
        return result;
    }
}

class CancellableTask implements Callable<String> {

    private volatile boolean cancelled;
    private final Object lock = new Object();
    private LinkedList<Object> cancellableResources = new LinkedList<Object>();

    @Override
    public String call() throws Exception {
        if (!cancelled) {
            System.out.println("Task started");
            // write file
            File file = File.createTempFile("testfile", ".txt");
            BufferedWriter writer = new BufferedWriter(new FileWriter(file));
            synchronized (lock) {
                cancellableResources.add(writer);
            }
            try {
                long lineCount = 0;
                while (lineCount++ < 100000000) {
                    writer.write("This is a test text at line: " + lineCount);
                    writer.newLine();
                }
                System.out.println("Task completed");
            } catch (Exception e) {
                e.printStackTrace();
            } finally {
                writer.close();
                file.delete();
                synchronized (lock) {
                    cancellableResources.clear();
                }
            }
        }
        return "done";
    }

    public void cancel() throws Exception {
        cancelled = true;
        Thread.sleep(1000);
        boolean success = false;
        synchronized (lock) {
            for (Object cancellableResource : cancellableResources) {
                if (cancellableResource instanceof Closeable) {
                    ((Closeable) cancellableResource).close();
                    success = true;
                }
            }
        }
        System.out.println("Task " + (success ? "cancelled" : "could not be cancelled. It might have completed or not started at all"));
    }
}

REST Http クライアント関連の要件については、ファクトリ クラスを次のように変更できます。

public class CancellableSimpleClientHttpRequestFactory extends SimpleClientHttpRequestFactory {

    private List<Object> cancellableResources;

    public CancellableSimpleClientHttpRequestFactory() {
    }

    public CancellableSimpleClientHttpRequestFactory(List<Object> cancellableResources) {
        this.cancellableResources = cancellableResources;
    }

    protected HttpURLConnection openConnection(URL url, Proxy proxy) throws IOException {
        HttpURLConnection connection = super.openConnection(url, proxy);
        if (cancellableResources != null) {
            cancellableResources.add(connection);
        }
        return connection;
    }
}

RestTemplateここでは、実行可能なタスクを作成するときにこのファクトリを使用する必要があります。

    RestTemplate template = new RestTemplate(new CancellableSimpleClientHttpRequestFactory(this.cancellableResources));

で維持したキャンセル可能なリソースの同じリストを渡すようにしてくださいCancellableTask

cancel()次に、このようにメソッドを変更する必要がありますCancellableTask-

synchronized (lock) {
    for (Object cancellableResource : cancellableResources) {
        if (cancellableResource instanceof HttpURLConnection) {
            ((HttpURLConnection) cancellableResource).disconnect();
            success = true;
        }
    }
}
于 2015-03-18T19:22:24.513 に答える