オブジェクトを入力パラメーターとして受け取り、そのオブジェクトに基づいてライブラリに取り組んでいDataRequest
ます。URL を作成し、Apache http クライアントを使用してアプリ サーバーを呼び出し、顧客に応答を返します。私たちのライブラリを使用しています。executeSync
同じ機能を取得するためにメソッドを呼び出す顧客もいれexecuteAsync
ば、データを取得するために私たちのメソッドを呼び出す顧客もいます。
executeSync()
- 結果が出るまで待ち、結果を返します。executeAsync()
- 必要に応じて、他の処理が完了した後に処理できる Future をすぐに返します。
以下は、DataClient
上記の2つのメソッドを持つ私のクラスです。
public class DataClient implements Client {
private final ForkJoinPool forkJoinPool = new ForkJoinPool(16);
private CloseableHttpClient httpClientBuilder;
// initializing httpclient only once
public DataClient() {
try {
RequestConfig requestConfig =
RequestConfig.custom().setConnectionRequestTimeout(500).setConnectTimeout(500)
.setSocketTimeout(500).setStaleConnectionCheckEnabled(false).build();
SocketConfig socketConfig =
SocketConfig.custom().setSoKeepAlive(true).setTcpNoDelay(true).build();
PoolingHttpClientConnectionManager poolingHttpClientConnectionManager =
new PoolingHttpClientConnectionManager();
poolingHttpClientConnectionManager.setMaxTotal(300);
poolingHttpClientConnectionManager.setDefaultMaxPerRoute(200);
httpClientBuilder =
HttpClientBuilder.create().setConnectionManager(poolingHttpClientConnectionManager)
.setDefaultRequestConfig(requestConfig).setDefaultSocketConfig(socketConfig).build();
} catch (Exception ex) {
// log error
}
}
@Override
public List<DataResponse> executeSync(DataRequest key) {
List<DataResponse> responsList = null;
Future<List<DataResponse>> responseFuture = null;
try {
responseFuture = executeAsync(key);
responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
} catch (TimeoutException | ExecutionException | InterruptedException ex) {
responsList =
Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT,
DataStatusEnum.ERROR));
responseFuture.cancel(true);
// logging exception here
}
return responsList;
}
@Override
public Future<List<DataResponse>> executeAsync(DataRequest key) {
DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder);
return this.forkJoinPool.submit(task);
}
}
以下は、URLを作成してアプリサーバーを呼び出すDataFetcherTask
静的クラスも持つ私のクラスです。DataRequestTask
public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {
private final DataRequest key;
private final CloseableHttpClient httpClientBuilder;
public DataFetcherTask(DataRequest key, CloseableHttpClient httpClientBuilder) {
this.key = key;
this.httpClientBuilder = httpClientBuilder;
}
@Override
protected List<DataResponse> compute() {
// Create subtasks for the key and invoke them
List<DataRequestTask> requestTasks = requestTasks(generateKeys());
invokeAll(requestTasks);
// All tasks are finished if invokeAll() returns.
List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
for (DataRequestTask task : requestTasks) {
try {
responseList.add(task.get());
} catch (InterruptedException | ExecutionException e) {
Thread.currentThread().interrupt();
return Collections.emptyList();
}
}
return responseList;
}
private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
List<DataRequestTask> tasks = new ArrayList<>(keys.size());
for (DataRequest key : keys) {
tasks.add(new DataRequestTask(key));
}
return tasks;
}
// In this method I am making a HTTP call to another service
// and then I will make List<DataRequest> accordingly.
private List<DataRequest> generateKeys() {
List<DataRequest> keys = new ArrayList<>();
// use key object which is passed in contructor to make HTTP call to another service
// and then make List of DataRequest object and return keys.
return keys;
}
/** Inner class for the subtasks. */
private static class DataRequestTask extends RecursiveTask<DataResponse> {
private final DataRequest request;
public DataRequestTask(DataRequest request) {
this.request = request;
}
@Override
protected DataResponse compute() {
return performDataRequest(this.request);
}
private DataResponse performDataRequest(DataRequest key) {
MappingHolder mappings = DataMapping.getMappings(key.getType());
List<String> hostnames = mappings.getAllHostnames(key);
for (String hostname : hostnames) {
String url = generateUrl(hostname);
HttpGet httpGet = new HttpGet(url);
httpGet.setConfig(generateRequestConfig());
httpGet.addHeader(key.getHeader());
try (CloseableHttpResponse response = httpClientBuilder.execute(httpGet)) {
HttpEntity entity = response.getEntity();
String responseBody =
TestUtils.isEmpty(entity) ? null : IOUtils.toString(entity.getContent(),
StandardCharsets.UTF_8);
return new DataResponse(responseBody, DataErrorEnum.OK, DataStatusEnum.OK);
} catch (IOException ex) {
// log error
}
}
return new DataResponse(DataErrorEnum.SERVERS_DOWN, DataStatusEnum.ERROR);
}
}
}
各DataRequest
オブジェクトにはオブジェクトがありDataResponse
ます。誰かがDataRequest
オブジェクトを渡してライブラリを呼び出すと、内部でList<DataRequest>
オブジェクトを作成し、各DataRequest
オブジェクトを並行して呼び出して、リスト内のList<DataResponse>
各DataResponse
オブジェクトが対応するDataRequest
オブジェクトの応答を持つ場所に戻ります。
以下の流れです。
- 顧客は、オブジェクト
DataClient
を渡すことによってクラスを呼び出しDataRequest
ます。要件に応じて、呼び出しexecuteSync()
またはメソッドを実行できます。executeAsync()
DataFetcherTask
クラス (サブタイプのRecursiveTask
1 つ) で、単一のオブジェクトを指定すると、リスト内の各オブジェクトに対して各サブタスクを並列に生成して呼び出します。これらのサブタスクは、親タスクと同じように実行されます。ForkJoinTask's
key
DataRequest
List<DataRequest>
DataRequest
ForkJoinPool
DataRequestTask
クラスではDataRequest
、URL を作成して各オブジェクトを実行し、そのDataResponse
オブジェクトを返します。
問題文:
このライブラリは非常に高いスループット環境で呼び出されるため、非常に高速である必要があります。同期呼び出しの場合、別のスレッドで実行しても問題ありませんか? この場合、スレッドのコンテキスト切り替えのコストに加えて、スレッドに余分なコストとリソースが発生するため、少し混乱しています。また、ForkJoinPool
ここで使用しているため、余分なスレッドプールを使用する必要がありませんが、ここで正しい選択ですか?
同様にパフォーマンス効率の良い同じことを行うためのより良い効率的な方法はありますか? 私はJava 7を使用しており、Guavaライブラリにもアクセスできるので、それが何かを簡素化できるのであれば、私もそれに対してオープンです.
非常に重い負荷の下で実行すると、競合が発生しているようです。このコードが非常に重い負荷の下で実行されたときに、スレッドの競合が発生する可能性はありますか?