0

私は現在、RPC レイヤーの POC に携わっています。クライアント側でリクエストを調整するために、次のメソッドを作成しました。これは従うべき良いパターンですか?追加のリクエストをスレッドプールにキューイングすることを選択しませんでした。これは、同期呼び出しのみに関心があり、RPC リクエストを実行するためにウェイクアップされるまで呼び出し元スレッドをブロックする必要があるためです。 .

すでにリクエストを発行しているスレッドで管理できると思いました。これはうまく機能しますが、通話が終了するとすぐに別の通話が発信されるため、CPU 使用率は他のプロセスにとって少し不公平です。膨大な数のリクエストで負荷テストを行いましたが、メモリと CPU の使用率は安定しています。ArrayBlockingQueue をポーリングで使用して同じことを達成できますか? poll() は CPU を大量に消費しますか?

注: requestEnd メソッドには、すべての登録済みアイテムが正しく起動されない可能性があるいくつかの並行性の問題があることを認識しており、そこで原子性を維持する効率的な方法を考えています。

 public class RequestQueue {    
    // TODO The capacity should come from the consumer which in turn comes from
    // config
    private static final int _OUTBOUND_REQUEST_QUEUE_MAXSIZE = 40000;
    private static final int _CURRENT_REQUEST_QUEUE_INCREMENT = 1;
    private static final int _CURRENT_REQUEST_POOL_MAXSIZE = 40;
    private AtomicInteger currentRequestsCount = new AtomicInteger(0);
    private ConcurrentLinkedQueue<RequestWaitItem> outboundRequestQueue = null;

    public RequestQueue() {
        outboundRequestQueue = new ConcurrentLinkedQueue<RequestWaitItem>();
    }



    public void registerForFuture(RequestWaitItem waitObject) throws Exception {
        if (outboundRequestQueue.size() < _OUTBOUND_REQUEST_QUEUE_MAXSIZE) {
            outboundRequestQueue.add(waitObject);
        } else {
            throw new Exception("Queue is full" + outboundRequestQueue.size());
        }
    }

    public void requestStart() {
        currentRequestsCount.addAndGet(_CURRENT_REQUEST_QUEUE_INCREMENT);
    }

    //Verify correctness
    public RequestWaitItem requestEnd() {
        int currentRequests = currentRequestsCount.decrementAndGet();
        if (this.outboundRequestQueue.size() > 0 && currentRequests < _CURRENT_REQUEST_POOL_MAXSIZE) {
            try {
                RequestWaitItem waitObject = (RequestWaitItem)this.outboundRequestQueue.remove();               
                waitObject.setRequestReady(true);
                synchronized (waitObject) {
                    waitObject.notify();
                }   
                return waitObject;
            } catch (NoSuchElementException ex) {
                //Queue is empty so this is not an exception condition
            }
        }
        return null;
    }

    public boolean isFull() {
        return currentRequestsCount.get() > _CURRENT_REQUEST_POOL_MAXSIZE;
    }

}




public class RequestWaitItem {
    private boolean requestReady;
    private RpcDispatcher dispatcher;

    public RequestWaitItem() {
        this.requestReady = false;
    }

    public RequestWaitItem(RpcDispatcher dispatcher) {
        this();
        this.dispatcher = dispatcher;
    }

    public boolean isRequestReady() {
        return requestReady;
    }

    public void setRequestReady(boolean requestReady) {
        this.requestReady = requestReady;
    }

    public RpcDispatcher getDispatcher() {
        return dispatcher;
    }
}




if (requestQueue.isFull()) {
        try {               
                RequestWaitItem waitObject = new RequestWaitItem(dispatcher);               
                requestQueue.registerForFuture(waitObject);
                //Sync              
                // Config and centralize this timeout
                synchronized (waitObject) {
                    waitObject.wait(_REQUEST_QUEUE_TIMEOUT);
                }

                if (waitObject.isRequestReady() == false) {
                    throw new Exception("Request Issuing timedout");
                }
                requestQueue.requestStart();
                try {
                    return waitObject.getDispatcher().dispatchRpcRequest();
                }finally {
                    requestQueue.requestEnd();
                }               
        } catch (Exception ex) {
            // TODO define exception type
            throw ex;
        }
    } else {
        requestQueue.requestStart();
        try {
            return dispatcher.dispatchRpcRequest();
        }finally {              
            requestQueue.requestEnd();
        }
    }
4

2 に答える 2

1

私の理解が正しければ、最大で 40 (たとえば) の同時要求を持つことによって、リモート サービスへの要求を調整する必要があります。セマフォを使用すると、余分なスレッドやサービスを使用せずに、これを簡単に行うことができます。

Semaphore s = new Semaphore(40);
...
s.acquire();
try {
    dispatcher.dispatchRpcRequest(); // Or whatever your remote call looks like
} finally {
    s.release();
}
于 2012-12-04T14:20:58.810 に答える
0

これに使用ExecutorService service = Executors.newFixedThreadPool(10);します。

これにより、最大 10 個のスレッドが作成され、さらに要求がキューで待機します。これが役立つはずです。

固定スレッド プール

于 2012-12-04T14:11:08.167 に答える