0

以下に示すように、2つのコンポーネントがあるRESTサービスベースのプロジェクトに取り組んでいます-

  1. URL'sService コンポーネントに必要なクライアント
  2. 次に、サービス(RESTサービス)コンポーネントはそれらURL'sを使用してデータベースからデータを取得します。

一般的な URL は次のようになります。

http://host.qa.ebay.com:8080/deservice/DEService/get/USERID=9012/PROFILE.ACCOUNT,PROFILE.ADVERTISING,PROFILE.DEMOGRAPHIC,PROFILE.FINANCIAL

上記の URL が意味することは、USERID- 9012-これらの列のデータベースからのデータを提供してください-

[PROFILE.ACCOUNT, PROFILE.ADVERTISING, PROFILE.DEMOGRAPHIC, PROFILE.FINANCIAL]

そして現在、クライアント コンポーネント側でベンチマークを行っています。time(95 Percentile)そして、私は以下の方法が周りの束を取っていることを発見しました~15ms

以下のメソッドは、2 つのパラメーターを受け入れます。

List<DEKey> keys- sample data in keys will have USERID=9012

List<String> reqAttrNames- sample data for reqAttrNames will be-

[PROFILE.ACCOUNT, PROFILE.ADVERTISING, PROFILE.DEMOGRAPHIC, PROFILE.FINANCIAL]

以下はコードです-

public DEResponse getDEAttributes(List<DEKey> keys, List<String> reqAttrNames) {

    DEResponse response = null;
    try {
        String url = buildGetUrl(keys,reqAttrNames);

        if(url!=null){
            List<CallableTask<DEResponse>> tasks = new ArrayList<CallableTask<DEResponse>>();
            CallableTask<DEResponse> task = new DEResponseTask(url); 
            tasks.add(task);

            // STEP 2: Execute worker threads for all the generated urls
            List<LoggingFuture<DEResponse>> futures = null;
            try {
                long waitTimeout = getWaitTimeout(keys);
                futures = executor.executeAll(tasks, null, waitTimeout, TimeUnit.MILLISECONDS);

                // STEP 3: Consolidate results of the executed worker threads
                if(futures!=null && futures.size()>0){
                    LoggingFuture<DEResponse> future = futures.get(0);
                    response = future.get();
                }
            } catch (InterruptedException e1) {
                logger.log(LogLevel.ERROR,"Transport:getDEAttributes Request timed-out :",e1);
            }
        }else{
            //
        }
    }  catch(Throwable th) {

    }

    return response;
}

そして、上記のメソッドはDEResponseオブジェクトを返します。

以下は、DEResponseTask class

public class DEResponseTask  extends BaseNamedTask implements CallableTask<DEResponse> {

        private final ObjectMapper m_mapper = new ObjectMapper();

        @Override
        public DEResponse call() throws Exception {
            URL url = null;
            DEResponse DEResponse = null;
            try {
                if(buildUrl!=null){
                    url = new URL(buildUrl);

                    DEResponse = m_mapper.readValue(url, DEResponse.class);

                }else{
                    logger.log(LogLevel.ERROR, "DEResponseTask:call is null ");
                }
            } catch (MalformedURLException e) {

            }catch (Throwable th) {

            }finally{
            }

            return DEResponse;
        }
    }

このマルチスレッド コードの記述方法に問題はありますか? はいの場合、どうすればこれを効率的にすることができますか?

私の会社のように、Sun Executorクラスを実装する独自のexecutorがあるためのexecuteAllメソッドの署名-executor

/**
     * Executes the given tasks, returning a list of futures holding their 
     * status and results when all complete or the timeout expires, whichever
     * happens first.  <tt>Future.isDone()</tt> is <tt>true</tt> for each
     * element of the returned list.  Upon return, tasks that have not completed
     * are cancelled.  Note that a <i>completed</i> task could have terminated
     * either normally or by throwing an exception.  The results of this method
     * are undefined if the given collection is modified while this operation is
     * in progress.  This is entirely analogous to
     * <tt>ExecutorService.invokeAll()</tt> except for a couple of important
     * differences.  First, it cancels but does not <b>interrupt</b> any 
     * unfinished tasks, unlike <tt>ExecutorService.invokeAll()</tt> which
     * cancels and interrupts unfinished tasks.  This results in a better 
     * adherence to the specified timeout value, as interrupting threads may
     * have unexpected delays depending on the nature of the tasks.  Also, all 
     * eBay-specific features apply when the tasks are submitted with this 
     * method.
     * 
     * @param tasks the collection of tasks
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return a list of futures representing the tasks, in the same sequential
     * order as produced by the iterator for the given task list.  If the 
     * operation did not time out, each task will have completed.  If it did
     * time out, some of these tasks will not have completed.
     * @throws InterruptedException if interrupted while waiting, in which case
     * unfinished tasks are cancelled
     */
    public <V> List<LoggingFuture<V>> executeAll(Collection<? extends CallableTask<V>> tasks, 
                                                 Options options, 
                                                 long timeout, TimeUnit unit)
            throws InterruptedException {
        return executeAll(tasks, options, timeout, unit, false);
    }

アップデート:-

このコンポーネントは、スレッドを増やしてベンチマークを実行しているプログラムの負荷を増やすとすぐに時間がかかります20

newFixedThreadPool(20)

しかし、私が使用すれば、このコンポーネントはうまく機能すると思います-

newSingleThreadExecutor

私が考えることができる唯一の理由は、上記のコードにある可能性があり、ブロッキング呼び出しがあり、それがスレッドがブロックされる理由であり、時間がかかる理由ですか?

更新しました:-

では、この行は次のように記述する必要がありますか? -

if(futures!=null && futures.size()>0){
                    LoggingFuture<DEResponse> future = futures.get(0);
                    //response = future.get();//replace this with below code-

                    while(!future.isDone()) {
                        Thread.sleep(500);
                    } 

                    response = future.get();
                }
4

2 に答える 2

0

あなたのコードを正しく読めば、明らかにパフォーマンスの問題が 1 つあります。これ:

public class DEResponseTask  extends BaseNamedTask implements CallableTask<DEResponse> {
    private final ObjectMapper m_mapper = new ObjectMapper();

はタスクごとに 1 回呼び出され、ObjectMapperインスタンスの作成には非常にコストがかかります。

これを修正するには多くの方法がありますが、おそらく次のいずれかを行います。

  1. m_mapper 参照を静的にします (一度だけ作成します) -- マッパーは一度構成すると安全に共有できます。または
  2. 共有で渡すObjectMapper(共有は安全です)

これを行うことで、JSON の処理効率に大きな違いが生じるはずです。

于 2013-04-22T21:09:24.727 に答える
0

複雑な非標準の Executor を使用していること以外に、パフォーマンスの低下を引き起こしていると思われるものは何もありません。どの Executor を使用できるかについて選択の余地がないことは承知していますが、好奇心から、これを aThreadPoolExecutorに置き換えて、これが違いを生むかどうかを確認し、大きな改善に気付いた場合は、あなたの仕事に注意してください。私の仕事では、別の部門によって作成された暗号化ライブラリが絶対にがらくたであることを発見し (CPU 時間の 80 ~ 90% が彼らのコードに費やされていました)、それを書き直すよう働きかけることに成功しました。

編集:

public class Aggregator implements Runnable {
    private static ConcurrentLinkedQueue<Future<DEResponse>> queue = new ConcurrentLinkedQueue<>();
    private static ArrayList<DEResponse> aggregation = new ArrayList<>();

    public static void offer(Future<DEResponse> future) {
        queue.offer(future);
    }

    public static ArrayList<DEResponse> getAggregation() {
        return aggregation;
    }

    public void run() {
        while(!queue.isEmpty()) { // make sure that all of the futures are added before this loop starts; better still, if you know how many worker threads there are then keep a count of how many futures are in your aggregator and quit this loop when aggregator.size() == [expected number of futures]
            aggregation.add(queue.poll().get());
        }
    }
}

public void getDEAttributes(List<DEKey> keys, List<String> reqAttrNames) {
    try {
        if(url!=null){
            try {
                futures = executor.executeAll(tasks, null, waitTimeout, TimeUnit.MILLISECONDS);
                if(futures!=null && futures.size()>0){
                    Aggregator.offer(futures.get(0));
                }
            }
        }
    }
}
于 2013-04-21T02:23:26.247 に答える