私は現在、パッケージ java.util.concurrent によって提供される Java の並行機能を使用することを学んでいます。演習として、HTTP API のパフォーマンス テストに使用できる小さなプログラムを作成しようとしました。しかし、どういうわけか、私のプログラムは頻繁に正しく終了しません。OSがクラッシュすることさえあります。
以下は私のプログラムの擬似コードです:
- HTTP API をクエリするリクエスト オブジェクトをインスタンス化します (この例では、ランダムなサイトを 1 つだけクエリします)。
- 複数の Callables をインスタンス化します。それぞれが Http Call を表します。
- Callable を反復処理し、ScheduledExecutorService を介してそれらをスケジュールします (1 秒あたりに実行する要求の数は、コードの先頭で構成できます)。
- すべての Callable をスケジュールした後、Future の反復処理を開始しています。futures が完了したら、応答を取得します。これを毎秒行います。新しい Future が終了していない場合は、ループを終了します。
どのような問題が発生していますか?
- 多くの場合、プログラムは正しく終了しません。プログラムが正しく終了しているかのように、すべてのログがコンソールに表示されます。しかし、実際には、日食の停止ボタンがまだアクティブなままになっていることがわかります。クリックすると、プログラムを正しく終了できなかったと表示されます。どんなに待っても終了しません(注:Eclipse内でプログラムを開始しています)。
- リクエストの数を増やしている場合、エラーを簡単に引き起こすことができます。2000年まで回っている場合、これは確実に発生します。OS がクラッシュした場合でも、Eclipse を使用できますが、他のアプリは動作しなくなります。
- 私の環境は、Java 1.6 と Apache httpclient 4.2.2 を搭載した Mac OS X 10.7 上の Eclipse 3.7 です。
私のコードに重大なエラーを見つけましたか? OSをクラッシュさせ、例外をまったく見ないJavaプログラムでこのような問題が発生したことは一度もありませんでした。
コード:
public class ConcurrentHttpRequestsTest {
/**
* @param args
*/
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(25);
Integer standardTimeout = 5000;
Float numberOfRequestsPerSecond = 50.0f;
Integer numberOfRequests = 500;
Integer durationBetweenRequests = Math.round(1000 / numberOfRequestsPerSecond);
// build Http Request
HttpGet request = null;
request = new HttpGet("http://www.spiegel.de");
// request.addHeader("Accept", "application/json");
HttpParams params = new BasicHttpParams();
HttpConnectionParams.setConnectionTimeout(params, standardTimeout);
HttpConnectionParams.setSoTimeout(params, standardTimeout);
request.setParams(params);
// setup concurrency logic
Collection<Callable<Long>> callables = new LinkedList<Callable<Long>>();
for (int i = 1; i <= numberOfRequests; i++) {
HttpClient client = new DefaultHttpClient();
callables.add(new UriCallable(request, client));
}
// start performing requests
int i = 1;
Collection<Future<Long>> futures = new LinkedList<Future<Long>>();
for (Callable<Long> callable : callables) {
ScheduledFuture<Long> future = scheduledExecutorService.schedule(callable, i * durationBetweenRequests, TimeUnit.MILLISECONDS);
futures.add(future);
i++;
}
// process futures (check wether they are ready yet)
Integer maximumNoChangeCount = 5;
boolean futuresAreReady = false;
int noChangeCount = 0;
int errorCount = 0;
List<Long> responses = new LinkedList<Long>();
while (!futuresAreReady) {
boolean allFuturesAreDone = true;
boolean atLeast1FutureIsDone = false;
Iterator<Future<Long>> iterator = futures.iterator();
while (iterator.hasNext()) {
Future<Long> future = iterator.next();
allFuturesAreDone = allFuturesAreDone && (future.isDone());
if (future.isDone()) {
try {
atLeast1FutureIsDone = true;
responses.add(future.get());
iterator.remove();
} catch (Exception e) {
// remove failed futures (e.g. timeout)
// System.out.println("Reached catch of future.get()" +
// e.getClass() + " " + e.getCause().getClass() + " " +
// e.getMessage());
iterator.remove();
errorCount++;
}
}
if (future.isCancelled()) {
// this code is never reached. Just here to make sure that
// this is not the cause of problems.
System.out.println("Found a cancelled future. Will remove it.");
iterator.remove();
}
}
if (!atLeast1FutureIsDone) {
System.out.println("At least 1 future was not done. Current noChangeCount:" + noChangeCount);
noChangeCount++;
} else {
// reset noChangeCount
noChangeCount = 0;
}
futuresAreReady = allFuturesAreDone;
// log the current state of responses, errors and remaining futures
System.out.println("Size of responses :" + responses.size() + "; Size of futures:" + futures.size() + " Errors:" + errorCount);
if (noChangeCount >= maximumNoChangeCount) {
System.out.println("Breaking while loop becauce no new future finished in the last " + maximumNoChangeCount + " iterations");
break;
}
// check every second
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
for (Long responsetime : responses) {
// analyze responsetimes or whatever
}
// clean up
// .shutdown() made even more problems than shutdownNow()
scheduledExecutorService.shutdownNow();
System.out.println("Executors have been shutdown - Main Method finished. Will exit System.");
System.out.flush();
System.exit(0);
}
private static class UriCallable implements Callable<Long> {
private HttpUriRequest request;
private HttpClient client;
public UriCallable(HttpUriRequest request, HttpClient client) {
super();
this.request = request;
this.client = client;
}
public Long call() throws Exception {
Long start = System.currentTimeMillis();
HttpResponse httpResponse = client.execute(request);
Long end = System.currentTimeMillis();
return end - start;
}
}
}