1

オブジェクトを入力パラメーターとして受け取り、そのオブジェクトに基づいてライブラリに取り組んでいDataRequestます。URL を作成し、Apache http クライアントを使用してアプリ サーバーを呼び出し、顧客に応答を返します。私たちのライブラリを使用しています。executeSync同じ機能を取得するためにメソッドを呼び出す顧客もいれexecuteAsyncば、データを取得するために私たちのメソッドを呼び出す顧客もいます。

  • executeSync()- 結果が出るまで待ち、結果を返します。
  • executeAsync()- 必要に応じて、他の処理が完了した後に処理できる Future をすぐに返します。

以下は、DataClient上記の2つのメソッドを持つ私のクラスです。

public class DataClient implements Client {
  private final ForkJoinPool forkJoinPool = new ForkJoinPool(16);
  private CloseableHttpClient httpClientBuilder;

  // initializing httpclient only once
  public DataClient() {
    try {
      RequestConfig requestConfig =
          RequestConfig.custom().setConnectionRequestTimeout(500).setConnectTimeout(500)
              .setSocketTimeout(500).setStaleConnectionCheckEnabled(false).build();
      SocketConfig socketConfig =
          SocketConfig.custom().setSoKeepAlive(true).setTcpNoDelay(true).build();

      PoolingHttpClientConnectionManager poolingHttpClientConnectionManager =
          new PoolingHttpClientConnectionManager();
      poolingHttpClientConnectionManager.setMaxTotal(300);
      poolingHttpClientConnectionManager.setDefaultMaxPerRoute(200);

      httpClientBuilder =
          HttpClientBuilder.create().setConnectionManager(poolingHttpClientConnectionManager)
              .setDefaultRequestConfig(requestConfig).setDefaultSocketConfig(socketConfig).build();
    } catch (Exception ex) {
      // log error
    }
  }

  @Override
  public List<DataResponse> executeSync(DataRequest key) {
    List<DataResponse> responsList = null;
    Future<List<DataResponse>> responseFuture = null;

    try {
      responseFuture = executeAsync(key);
      responsList = responseFuture.get(key.getTimeout(), key.getTimeoutUnit());
    } catch (TimeoutException | ExecutionException | InterruptedException ex) {
      responsList =
          Collections.singletonList(new DataResponse(DataErrorEnum.CLIENT_TIMEOUT,
              DataStatusEnum.ERROR));
      responseFuture.cancel(true);
      // logging exception here
    }
    return responsList;
  }

  @Override
  public Future<List<DataResponse>> executeAsync(DataRequest key) {
    DataFetcherTask task = new DataFetcherTask(key, this.httpClientBuilder);
    return this.forkJoinPool.submit(task);
  }
}

以下は、URLを作成してアプリサーバーを呼び出すDataFetcherTask静的クラスも持つ私のクラスです。DataRequestTask

public class DataFetcherTask extends RecursiveTask<List<DataResponse>> {
  private final DataRequest key;
  private final CloseableHttpClient httpClientBuilder;

  public DataFetcherTask(DataRequest key, CloseableHttpClient httpClientBuilder) {
    this.key = key;
    this.httpClientBuilder = httpClientBuilder;
  }

  @Override
  protected List<DataResponse> compute() {
    // Create subtasks for the key and invoke them
    List<DataRequestTask> requestTasks = requestTasks(generateKeys());
    invokeAll(requestTasks);

    // All tasks are finished if invokeAll() returns.
    List<DataResponse> responseList = new ArrayList<>(requestTasks.size());
    for (DataRequestTask task : requestTasks) {
      try {
        responseList.add(task.get());
      } catch (InterruptedException | ExecutionException e) {
        Thread.currentThread().interrupt();
        return Collections.emptyList();
      }
    }
    return responseList;
  }

  private List<DataRequestTask> requestTasks(List<DataRequest> keys) {
    List<DataRequestTask> tasks = new ArrayList<>(keys.size());
    for (DataRequest key : keys) {
      tasks.add(new DataRequestTask(key));
    }
    return tasks;
  }

  // In this method I am making a HTTP call to another service
  // and then I will make List<DataRequest> accordingly.
  private List<DataRequest> generateKeys() {
    List<DataRequest> keys = new ArrayList<>();
    // use key object which is passed in contructor to make HTTP call to another service
    // and then make List of DataRequest object and return keys.
    return keys;
  }

  /** Inner class for the subtasks. */
  private static class DataRequestTask extends RecursiveTask<DataResponse> {
    private final DataRequest request;

    public DataRequestTask(DataRequest request) {
      this.request = request;
    }

    @Override
    protected DataResponse compute() {
      return performDataRequest(this.request);
    }

    private DataResponse performDataRequest(DataRequest key) {
      MappingHolder mappings = DataMapping.getMappings(key.getType());
      List<String> hostnames = mappings.getAllHostnames(key);

      for (String hostname : hostnames) {
        String url = generateUrl(hostname);
        HttpGet httpGet = new HttpGet(url);
        httpGet.setConfig(generateRequestConfig());
        httpGet.addHeader(key.getHeader());

        try (CloseableHttpResponse response = httpClientBuilder.execute(httpGet)) {
          HttpEntity entity = response.getEntity();
          String responseBody =
              TestUtils.isEmpty(entity) ? null : IOUtils.toString(entity.getContent(),
                  StandardCharsets.UTF_8);

          return new DataResponse(responseBody, DataErrorEnum.OK, DataStatusEnum.OK);
        } catch (IOException ex) {
          // log error
        }
      }
      return new DataResponse(DataErrorEnum.SERVERS_DOWN, DataStatusEnum.ERROR);
    }
  }
}

DataRequestオブジェクトにはオブジェクトがありDataResponseます。誰かがDataRequestオブジェクトを渡してライブラリを呼び出すと、内部でList<DataRequest>オブジェクトを作成し、各DataRequestオブジェクトを並行して呼び出して、リスト内のList<DataResponse>DataResponseオブジェクトが対応するDataRequestオブジェクトの応答を持つ場所に戻ります。

以下の流れです。

  • 顧客は、オブジェクトDataClientを渡すことによってクラスを呼び出しDataRequestます。要件に応じて、呼び出しexecuteSync()またはメソッドを実行できます。executeAsync()
  • DataFetcherTaskクラス (サブタイプのRecursiveTask1 つ) で、単一のオブジェクトを指定すると、リスト内の各オブジェクトに対して各サブタスクを並列に生成して呼び出します。これらのサブタスクは、親タスクと同じように実行されます。ForkJoinTask'skeyDataRequestList<DataRequest>DataRequestForkJoinPool
  • DataRequestTaskクラスではDataRequest、URL を作成して各オブジェクトを実行し、そのDataResponseオブジェクトを返します。

問題文:

このライブラリは非常に高いスループット環境で呼び出されるため、非常に高速である必要があります。同期呼び出しの場合、別のスレッドで実行しても問題ありませんか? この場合、スレッドのコンテキスト切り替えのコストに加えて、スレッドに余分なコストとリソースが発生するため、少し混乱しています。また、ForkJoinPoolここで使用しているため、余分なスレッドプールを使用する必要がありませんが、ここで正しい選択ですか?

同様にパフォーマンス効率の良い同じことを行うためのより良い効率的な方法はありますか? 私はJava 7を使用しており、Guavaライブラリにもアクセスできるので、それが何かを簡素化できるのであれば、私もそれに対してオープンです.

非常に重い負荷の下で実行すると、競合が発生しているようです。このコードが非常に重い負荷の下で実行されたときに、スレッドの競合が発生する可能性はありますか?

4

2 に答える 2

0

あなたの状況では、非同期 http 呼び出しを使用する方が良いと思います。リンクを参照してください: HttpAsyncClient。また、スレッドプールを使用する必要はありません。

executeAsync メソッドで空の CompletableFuture<DataResponse>() を作成し、それをクライアント呼び出しに渡します。コールバック呼び出しで、complete (または例外が発生した場合は completeExceptionally) を呼び出して completableFuture の結果を設定します。ExecuteSync メソッドの実装は良さそうです。

編集:

Java 7の場合、ListenableFutureなどのように、グアバでの実装を約束するためにcompletableFutureを置き換えるだけで済みます

于 2017-03-09T06:53:43.977 に答える