3

非同期の突堤の応答に問題があります。組み込みの Jetty を使用して、リアクター ループとやり取りし、ループが応答を処理するときに応答を収集し、応答がコレクター ハッシュに表示されるときに Jetty 要求を終了します。

何かがコレクターハッシュ内にある場合でも、コンテキストを閉じるエラーが表示されます-コンテキストがタイムアウトしたと思います。これを検出して完了を呼び出さないようにしましたが、これは機能しませんでした。((AsyncContinuation)ctx).isInitial() をチェックして、コンテキストを閉じることができるかどうかを確認しようとしましたが、何もしませんでした。

エラーは次のとおりです。

INFO: e0d879983e682c0d6 API Request happens
May 25, 2012 4:39:00 AM server.Reactor$Parser parse
INFO: Received data { data from reactor }
May 25, 2012 4:39:00 AM server.AsyncHTTPRequestProcessor run
INFO: --->API http request in collector:
May 25, 2012 4:39:00 AM server.AsyncHTTPRequestProcessor run
INFO: Collector in async stuff:
2012-05-25 04:39:00.846:WARN:oejut.QueuedThreadPool:
java.lang.IllegalStateException: IDLE,initial
    at org.eclipse.jetty.server.AsyncContinuation.complete(AsyncContinuation.java:569)
    at server.AsyncHTTPRequestProcessor.run(AsyncHTTPRequestProcessor.java:72)
    at org.eclipse.jetty.server.handler.ContextHandler.handle(ContextHandler.java:1119)
    at org.eclipse.jetty.server.AsyncContinuation$1.run(AsyncContinuation.java:875)
    at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:599)
    at org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:534)
    at java.lang.Thread.run(Thread.java:679)

Jetty エンドポイントは次のとおりです。

package server;
import java.io.*;
import java.nio.channels.Selector;
import java.text.MessageFormat;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.logging.Level;
import java.util.logging.Logger;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

import org.eclipse.jetty.continuation.*;



public class APIRequestHandler extends HttpServlet
{
    private static final long serialVersionUID = -446741424265391138L;

    private ConcurrentHashMap<String, clientconnData> key2clientconn;
    private ConcurrentLinkedQueue<APIMessage> queue;
    private ConcurrentHashMap<String, String> collector;
    private int timeout;
    private Logger logger;
    private Selector selector;

    public APIRequestHandler(Logger logger, Selector selector, ConcurrentLinkedQueue<APIMessage> queue,
            ConcurrentHashMap<String, String> collector, ConcurrentHashMap<String, clientconnData> key2clientconn2, int timeout)
    {
        this.logger = logger;
        this.selector = selector;
        this.queue = queue;
        this.collector = collector;
        this.timeout = timeout;
        this.key2clientconn = key2clientconn2;
    }

    public void doGet(HttpServletRequest req, HttpServletResponse res)
            throws java.io.IOException
            {
        //System.out.println("request URI=" + req.getRequestURI());

        String rid = req.getParameter("rid");
        String clientconnId = req.getParameter("id");
        String msg = req.getParameter("json");

        // We do a null check to avoid the favicon call and other broken calls
        if(clientconnId==null || rid==null || msg==null){
            PrintWriter out = res.getWriter();
            res.setStatus(200);
            res.setContentType("application/json");
            out.println("{");
            out.println("\"response\":{\"success\":\"false\",\"response\":\"request missing parameters\"}");
            out.println("}");
            out.close();
        }
        else if(!this.key2clientconn.containsKey(clientconnId))
        {
            PrintWriter out = res.getWriter();
            res.setStatus(200);
            res.setContentType("application/json");
            out.println("{");
            out.println("\"rid\":"+"\""+rid+"\"");
            out.println(",");
            out.println("\"rtime\":"+0); // milliseconds
            out.println(",");
            out.println("\"response\":{\"success\":\"false\",\"response\":\"clientconn with not found on server\"}");
            out.println("}");
            out.close();
        }
        else// everything is fine, proceed as normal
        {
            logger.log(Level.INFO,"From API: "+msg);

            // Send to channel
            APIMessage m = new APIMessage();
            m.rid = rid;
            m.id = clientconnId;
            m.json = msg + "\r\n";

            // create the async context, otherwise getAsyncContext() will be null
            final AsyncContext ctx = req.startAsync();

            // set the timeout
            ctx.setTimeout(this.timeout);
            ctx.getRequest().setAttribute("rid",rid);
            ctx.getRequest().setAttribute("startTime",System.currentTimeMillis()); // start time

            // set up selector/queue
            queue.add(m);
            this.selector.wakeup();

            // attach listener to respond to lifecycle events of this AsyncContext
            ctx.addListener(new AsyncListener() {
                public void onComplete(AsyncEvent event) throws IOException {
                    log("onComplete called");

                    // NOTE: errors and timeouts are handled with other events.
                    // Successful comm with clientconn handled by AsyncHTTPRequestProcessor.            
                    ServletRequest asyncContextReq = event.getAsyncContext().getRequest();

                    int httpStatusCode = 500;
                    if(asyncContextReq.getAttribute("status") != null)
                        httpStatusCode = (Integer) asyncContextReq.getAttribute("status");
                    Object response = "{}";
                    if(asyncContextReq.getAttribute("response") != null)
                        response = event.getAsyncContext().getRequest().getAttribute("response");

                    long startTime = (Long) asyncContextReq.getAttribute("startTime");
                    long endTime = System.currentTimeMillis();
                    long elapsedTime = endTime - startTime;

                    // Do the response
                    HttpServletResponse ctxRes = (HttpServletResponse)event.getAsyncContext().getResponse();

                    ctxRes.setStatus(httpStatusCode);
                    ctxRes.setContentType("application/json");
                    ctxRes.getWriter().println("{");
                    ctxRes.getWriter().println("\"rid\":"+"\""+asyncContextReq.getParameter("rid")+"\"");
                    ctxRes.getWriter().println(",");
                    ctxRes.getWriter().println("\"rtime\":"+elapsedTime); // milliseconds
                    ctxRes.getWriter().println(",");
                    ctxRes.getWriter().println("\"response\": "+response);

                    ctxRes.getWriter().println("}");
                    ctxRes.getWriter().flush();

                }
                public void onTimeout(AsyncEvent event) throws IOException {
                    log("onTimeout called");

                    // gateway timeout (on clientconn request)
                    ServletRequest asyncContextReq = event.getAsyncContext().getRequest();
                    asyncContextReq.setAttribute("status",500); // request timeout
                    asyncContextReq.setAttribute("response", "{}");
                }
                public void onError(AsyncEvent event) throws IOException {
                    log("onError called");

                    ServletRequest asyncContextReq = event.getAsyncContext().getRequest();
                    asyncContextReq.setAttribute("status",500); // request timeout
                    asyncContextReq.setAttribute("response", "{}");
                }
                public void onStartAsync(AsyncEvent event) throws IOException {
                    log("onStartAsync called");
                }
            });

            // spawn some task in a background thread
            ctx.start(new AsyncHTTPRequestProcessor(ctx,collector,logger));
        }
    }
}

非同期応答の処理:

package server;
import java.io.IOException;
import java.text.MessageFormat;
import java.util.Date;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.logging.Logger;

import javax.servlet.AsyncContext;

import org.eclipse.jetty.server.AsyncContinuation;


public class AsyncHTTPRequestProcessor implements Runnable {

    private ConcurrentHashMap<String, String> collector;
    private Logger logger;
    private AsyncContext ctx;
    private String responseStr = null;

    public AsyncHTTPRequestProcessor(AsyncContext _ctx, 
            ConcurrentHashMap<String, String> _collector, Logger _logger) {
        ctx = _ctx;
        collector = _collector;
        logger = _logger;
    }

    @Override
    public void run() {

        logger.info("AsyncContinuation start");

        //if(!((AsyncContinuation)ctx).isInitial()){
        String rid = (String) ctx.getRequest().getAttribute("rid");
        int elapsed = 0;
        if(rid !=null)
        {

            logger.info("AsyncContinuation rid="+rid);

            while(elapsed<ctx.getTimeout())
            {
                if(collector.containsKey(rid)){
                    responseStr = collector.get(rid);
                    collector.remove(rid);

                    logger.info("--->API http request in collector:"+responseStr);
                    ctx.getRequest().setAttribute("status",200);
                    ctx.getRequest().setAttribute("response", responseStr);
                    ctx.getRequest().setAttribute("endTime",System.currentTimeMillis());
                    break;
                }
                try {
                    Thread.sleep(10);
                    elapsed+=10;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            //}
            logger.info("Collector in async stuff:");
            for(String key:collector.keySet()){
                logger.info(key+"->"+collector.get(key));
            }

            for(Entry<String, String> x:collector.entrySet()){
                logger.info(x.getKey()+"->"+x.getValue());
            }
            ctx.complete();
        }
    }

}
4

2 に答える 2

0

私もこの問題を抱えていました。AsyncContext が閉じられていた場合、AsyncContext.complate() メソッドは java.lang.IllegalStateException をスローするようです。AsyncContext が閉じられていたかどうかをプログラムでもう一度確認する必要があると思います。 complate() メソッドを呼び出す前に。

于 2013-11-07T02:38:03.803 に答える
0

complate() メソッドを呼び出す前に AsyncContext が閉じられていたかどうかを確認するために、もう一度プログラムをチェックする必要があると思います。これは私の問題に効果的です。

于 2014-04-28T10:44:22.917 に答える