非同期の突堤の応答に問題があります。組み込みの Jetty を使用して、リアクター ループとやり取りし、ループが応答を処理するときに応答を収集し、応答がコレクター ハッシュに表示されるときに Jetty 要求を終了します。
何かがコレクターハッシュ内にある場合でも、コンテキストを閉じるエラーが表示されます-コンテキストがタイムアウトしたと思います。これを検出して完了を呼び出さないようにしましたが、これは機能しませんでした。((AsyncContinuation)ctx).isInitial() をチェックして、コンテキストを閉じることができるかどうかを確認しようとしましたが、何もしませんでした。
エラーは次のとおりです。
INFO: e0d879983e682c0d6 API Request happens
May 25, 2012 4:39:00 AM server.Reactor$Parser parse
INFO: Received data { data from reactor }
May 25, 2012 4:39:00 AM server.AsyncHTTPRequestProcessor run
INFO: --->API http request in collector:
May 25, 2012 4:39:00 AM server.AsyncHTTPRequestProcessor run
INFO: Collector in async stuff:
2012-05-25 04:39:00.846:WARN:oejut.QueuedThreadPool:
java.lang.IllegalStateException: IDLE,initial
at org.eclipse.jetty.server.AsyncContinuation.complete(AsyncContinuation.java:569)
at server.AsyncHTTPRequestProcessor.run(AsyncHTTPRequestProcessor.java:72)
at org.eclipse.jetty.server.handler.ContextHandler.handle(ContextHandler.java:1119)
at org.eclipse.jetty.server.AsyncContinuation$1.run(AsyncContinuation.java:875)
at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:599)
at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:534)
at java.lang.Thread.run(Thread.java:679)
Jetty エンドポイントは次のとおりです。
package server;
import java.io.*;
import java.nio.channels.Selector;
import java.text.MessageFormat;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.continuation.*;
public class APIRequestHandler extends HttpServlet
{
private static final long serialVersionUID = -446741424265391138L;
private ConcurrentHashMap<String, clientconnData> key2clientconn;
private ConcurrentLinkedQueue<APIMessage> queue;
private ConcurrentHashMap<String, String> collector;
private int timeout;
private Logger logger;
private Selector selector;
public APIRequestHandler(Logger logger, Selector selector, ConcurrentLinkedQueue<APIMessage> queue,
ConcurrentHashMap<String, String> collector, ConcurrentHashMap<String, clientconnData> key2clientconn2, int timeout)
{
this.logger = logger;
this.selector = selector;
this.queue = queue;
this.collector = collector;
this.timeout = timeout;
this.key2clientconn = key2clientconn2;
}
public void doGet(HttpServletRequest req, HttpServletResponse res)
throws java.io.IOException
{
//System.out.println("request URI=" + req.getRequestURI());
String rid = req.getParameter("rid");
String clientconnId = req.getParameter("id");
String msg = req.getParameter("json");
// We do a null check to avoid the favicon call and other broken calls
if(clientconnId==null || rid==null || msg==null){
PrintWriter out = res.getWriter();
res.setStatus(200);
res.setContentType("application/json");
out.println("{");
out.println("\"response\":{\"success\":\"false\",\"response\":\"request missing parameters\"}");
out.println("}");
out.close();
}
else if(!this.key2clientconn.containsKey(clientconnId))
{
PrintWriter out = res.getWriter();
res.setStatus(200);
res.setContentType("application/json");
out.println("{");
out.println("\"rid\":"+"\""+rid+"\"");
out.println(",");
out.println("\"rtime\":"+0); // milliseconds
out.println(",");
out.println("\"response\":{\"success\":\"false\",\"response\":\"clientconn with not found on server\"}");
out.println("}");
out.close();
}
else// everything is fine, proceed as normal
{
logger.log(Level.INFO,"From API: "+msg);
// Send to channel
APIMessage m = new APIMessage();
m.rid = rid;
m.id = clientconnId;
m.json = msg + "\r\n";
// create the async context, otherwise getAsyncContext() will be null
final AsyncContext ctx = req.startAsync();
// set the timeout
ctx.setTimeout(this.timeout);
ctx.getRequest().setAttribute("rid",rid);
ctx.getRequest().setAttribute("startTime",System.currentTimeMillis()); // start time
// set up selector/queue
queue.add(m);
this.selector.wakeup();
// attach listener to respond to lifecycle events of this AsyncContext
ctx.addListener(new AsyncListener() {
public void onComplete(AsyncEvent event) throws IOException {
log("onComplete called");
// NOTE: errors and timeouts are handled with other events.
// Successful comm with clientconn handled by AsyncHTTPRequestProcessor.
ServletRequest asyncContextReq = event.getAsyncContext().getRequest();
int httpStatusCode = 500;
if(asyncContextReq.getAttribute("status") != null)
httpStatusCode = (Integer) asyncContextReq.getAttribute("status");
Object response = "{}";
if(asyncContextReq.getAttribute("response") != null)
response = event.getAsyncContext().getRequest().getAttribute("response");
long startTime = (Long) asyncContextReq.getAttribute("startTime");
long endTime = System.currentTimeMillis();
long elapsedTime = endTime - startTime;
// Do the response
HttpServletResponse ctxRes = (HttpServletResponse)event.getAsyncContext().getResponse();
ctxRes.setStatus(httpStatusCode);
ctxRes.setContentType("application/json");
ctxRes.getWriter().println("{");
ctxRes.getWriter().println("\"rid\":"+"\""+asyncContextReq.getParameter("rid")+"\"");
ctxRes.getWriter().println(",");
ctxRes.getWriter().println("\"rtime\":"+elapsedTime); // milliseconds
ctxRes.getWriter().println(",");
ctxRes.getWriter().println("\"response\": "+response);
ctxRes.getWriter().println("}");
ctxRes.getWriter().flush();
}
public void onTimeout(AsyncEvent event) throws IOException {
log("onTimeout called");
// gateway timeout (on clientconn request)
ServletRequest asyncContextReq = event.getAsyncContext().getRequest();
asyncContextReq.setAttribute("status",500); // request timeout
asyncContextReq.setAttribute("response", "{}");
}
public void onError(AsyncEvent event) throws IOException {
log("onError called");
ServletRequest asyncContextReq = event.getAsyncContext().getRequest();
asyncContextReq.setAttribute("status",500); // request timeout
asyncContextReq.setAttribute("response", "{}");
}
public void onStartAsync(AsyncEvent event) throws IOException {
log("onStartAsync called");
}
});
// spawn some task in a background thread
ctx.start(new AsyncHTTPRequestProcessor(ctx,collector,logger));
}
}
}
非同期応答の処理:
package server;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Date;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;
import javax.servlet.AsyncContext;
import org.eclipse.jetty.server.AsyncContinuation;
public class AsyncHTTPRequestProcessor implements Runnable {
private ConcurrentHashMap<String, String> collector;
private Logger logger;
private AsyncContext ctx;
private String responseStr = null;
public AsyncHTTPRequestProcessor(AsyncContext _ctx,
ConcurrentHashMap<String, String> _collector, Logger _logger) {
ctx = _ctx;
collector = _collector;
logger = _logger;
}
@Override
public void run() {
logger.info("AsyncContinuation start");
//if(!((AsyncContinuation)ctx).isInitial()){
String rid = (String) ctx.getRequest().getAttribute("rid");
int elapsed = 0;
if(rid !=null)
{
logger.info("AsyncContinuation rid="+rid);
while(elapsed<ctx.getTimeout())
{
if(collector.containsKey(rid)){
responseStr = collector.get(rid);
collector.remove(rid);
logger.info("--->API http request in collector:"+responseStr);
ctx.getRequest().setAttribute("status",200);
ctx.getRequest().setAttribute("response", responseStr);
ctx.getRequest().setAttribute("endTime",System.currentTimeMillis());
break;
}
try {
Thread.sleep(10);
elapsed+=10;
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//}
logger.info("Collector in async stuff:");
for(String key:collector.keySet()){
logger.info(key+"->"+collector.get(key));
}
for(Entry<String, String> x:collector.entrySet()){
logger.info(x.getKey()+"->"+x.getValue());
}
ctx.complete();
}
}
}