50

問題の説明

サーブレット-3.0APIを使用すると、要求/応答コンテキストを切り離して、後でそれに応答できます。

ただし、大量のデータを書き込もうとすると、次のようになります。

AsyncContext ac = getWaitingContext() ;
ServletOutputStream out = ac.getResponse().getOutputStream();
out.print(some_big_data);
out.flush()

Tomcat7とJetty8の両方で、実際にブロックされる可能性があります。また、簡単なテストケースではブロックされます。チュートリアルでは、このようなセットアップを処理するスレッドプールを作成することをお勧めします。

ただし、10,000のオープン接続と、たとえば10スレッドのスレッドプールがある場合、低速接続または接続をブロックしたばかりのクライアントの1%でも、スレッドプールをブロックして、彗星の応答を完全にブロックするか、速度を落とすだけで十分です。大幅。

予想される方法は、「書き込み可能」通知またはI / O完了通知を取得してから、データをプッシュし続けることです。

Servlet-3.0 APIを使用してこれを行うにはどうすればよいですか。つまり、次のいずれかを取得するにはどうすればよいですか。

  • I/O操作に関する非同期完了通知。
  • 書き込み準備完了通知でノンブロッキングI/Oを取得します。

これがServlet-3.0APIでサポートされていない場合、スレッドプールを使用して非同期I / Oを偽造することなく、このようなイベントを真に非同期で処理できるWebサーバー固有のAPI(JettyContinuationやTomcatCometEventなど)はありますか。

誰か知っていますか?

そして、これが不可能な場合は、ドキュメントを参照して確認できますか?

サンプルコードでの問題のデモンストレーション

イベントストリームをエミュレートする以下のコードを添付しました。

ノート:

  • 切断されたクライアントを検出するためにServletOutputStreamそのスローを使用しますIOException
  • keep-aliveクライアントがまだそこにいることを確認するためにメッセージを送信します
  • 非同期操作を「エミュレート」するスレッドプールを作成しました。

このような例では、問題を示すためにサイズ1のスレッドプールを明示的に定義しました。

  • アプリケーションを起動します
  • 2つのターミナルから実行curl http://localhost:8080/path/to/app(2回)
  • 次に、でデータを送信しますcurd -d m=message http://localhost:8080/path/to/app
  • 両方のクライアントがデータを受信しました
  • ここで、クライアントの1つを一時停止し(Ctrl + Z)、メッセージをもう一度送信しますcurd -d m=message http://localhost:8080/path/to/app
  • 中断されていない別のクライアントが何も受信しなかったか、メッセージが転送された後、他のスレッドがブロックされているためにキープアライブ要求の受信を停止したことを確認します。

1000〜5000のオープン接続を使用すると、スレッドプールを非常に速く使い果たすことができるため、スレッドプールを使用せずにこのような問題を解決したいと思います。

以下のサンプルコード。


import java.io.IOException;
import java.util.HashSet;
import java.util.Iterator;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.LinkedBlockingQueue;

import javax.servlet.AsyncContext;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.ServletOutputStream;


@WebServlet(urlPatterns = "", asyncSupported = true)
public class HugeStreamWithThreads extends HttpServlet {

    private long id = 0;
    private String message = "";
    private final ThreadPoolExecutor pool = 
        new ThreadPoolExecutor(1, 1, 50000L,TimeUnit.MILLISECONDS,new LinkedBlockingQueue<Runnable>());
        // it is explicitly small for demonstration purpose

    private final Thread timer = new Thread(new Runnable() {
        public void run()
        {
            try {
                while(true) {
                    Thread.sleep(1000);
                    sendKeepAlive();
                }
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });


    class RunJob implements Runnable {
        volatile long lastUpdate = System.nanoTime();
        long id = 0;
        AsyncContext ac;
        RunJob(AsyncContext ac) 
        {
            this.ac = ac;
        }
        public void keepAlive()
        {
            if(System.nanoTime() - lastUpdate > 1000000000L)
                pool.submit(this);
        }
        String formatMessage(String msg)
        {
            StringBuilder sb = new StringBuilder();
            sb.append("id");
            sb.append(id);
            for(int i=0;i<100000;i++) {
                sb.append("data:");
                sb.append(msg);
                sb.append("\n");
            }
            sb.append("\n");
            return sb.toString();
        }
        public void run()
        {
            String message = null;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id) {
                    this.id = HugeStreamWithThreads.this.id;
                    message = HugeStreamWithThreads.this.message;
                }
            }
            if(message == null)
                message = ":keep-alive\n\n";
            else
                message = formatMessage(message);

            if(!sendMessage(message))
                return;

            boolean once_again = false;
            synchronized(HugeStreamWithThreads.this) {
                if(this.id != HugeStreamWithThreads.this.id)
                    once_again = true;
            }
            if(once_again)
                pool.submit(this);

        }
        boolean sendMessage(String message) 
        {
            try {
                ServletOutputStream out = ac.getResponse().getOutputStream();
                out.print(message);
                out.flush();
                lastUpdate = System.nanoTime();
                return true;
            }
            catch(IOException e) {
                ac.complete();
                removeContext(this);
                return false;
            }
        }
    };

    private HashSet<RunJob> asyncContexts = new HashSet<RunJob>();

    @Override
    public void init(ServletConfig config) throws ServletException
    {
        super.init(config);
        timer.start();
    }
    @Override
    public void destroy()
    {
        for(;;){
            try {
                timer.interrupt();
                timer.join();
                break;
            }
            catch(InterruptedException e) {
                continue;
            }
        }
        pool.shutdown();
        super.destroy();
    }


    protected synchronized void removeContext(RunJob ac)
    {
        asyncContexts.remove(ac);
    }

    // GET method is used to establish a stream connection
    @Override
    protected synchronized void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {

        // Content-Type header
        response.setContentType("text/event-stream");
        response.setCharacterEncoding("utf-8");

        // Access-Control-Allow-Origin header
        response.setHeader("Access-Control-Allow-Origin", "*");

        final AsyncContext ac = request.startAsync();

        ac.setTimeout(0);
        RunJob job = new RunJob(ac);
        asyncContexts.add(job);
        if(id!=0) {
            pool.submit(job);
        }
    }

    private synchronized void sendKeepAlive()
    {
        for(RunJob job : asyncContexts) {
            job.keepAlive();
        }
    }

    // POST method is used to communicate with the server
    @Override
    protected synchronized void doPost(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException 
    {
        request.setCharacterEncoding("utf-8");
        id++;
        message = request.getParameter("m");        
        for(RunJob job : asyncContexts) {
            pool.submit(job);
        }
    }


}

上記のサンプルでは、​​スレッドを使用してブロックを防止しています...ただし、ブロックするクライアントの数がスレッドプールのサイズよりも大きい場合は、ブロックされます。

ブロックせずにどのように実装できますか?

4

6 に答える 6

29

私は、Servlet 3.0 AsynchronousAPI を正しく実装するのが難しく、役立つドキュメントがまばらであることを発見しました。試行錯誤を繰り返し、さまざまなアプローチを試みた結果、非常に満足できる堅牢なソリューションを見つけることができました。私のコードを見てあなたのコードと比較すると、あなたの特定の問題を解決するのに役立つかもしれない 1 つの大きな違いに気付きます。ではなく を使用しServletResponseてデータを書き込みますServletOutputStream

ここで、私の頼りになる非同期サーブレットクラスは、あなたのsome_big_dataケースにわずかに適応しています:

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.ServletConfig;
import javax.servlet.ServletException;
import javax.servlet.ServletResponse;
import javax.servlet.annotation.WebInitParam;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpSession;

import org.apache.log4j.Logger;

@javax.servlet.annotation.WebServlet(urlPatterns = { "/async" }, asyncSupported = true, initParams = { @WebInitParam(name = "threadpoolsize", value = "100") })
public class AsyncServlet extends HttpServlet {

  private static final Logger logger = Logger.getLogger(AsyncServlet.class);

  public static final int CALLBACK_TIMEOUT = 10000; // ms

  /** executor service */
  private ExecutorService exec;

  @Override
  public void init(ServletConfig config) throws ServletException {

    super.init(config);
    int size = Integer.parseInt(getInitParameter("threadpoolsize"));
    exec = Executors.newFixedThreadPool(size);
  }

  @Override
  public void service(HttpServletRequest req, HttpServletResponse res) throws ServletException, IOException {

    final AsyncContext ctx = req.startAsync();
    final HttpSession session = req.getSession();

    // set the timeout
    ctx.setTimeout(CALLBACK_TIMEOUT);

    // attach listener to respond to lifecycle events of this AsyncContext
    ctx.addListener(new AsyncListener() {

      @Override
      public void onComplete(AsyncEvent event) throws IOException {

        logger.info("onComplete called");
      }

      @Override
      public void onTimeout(AsyncEvent event) throws IOException {

        logger.info("onTimeout called");
      }

      @Override
      public void onError(AsyncEvent event) throws IOException {

        logger.info("onError called: " + event.toString());
      }

      @Override
      public void onStartAsync(AsyncEvent event) throws IOException {

        logger.info("onStartAsync called");
      }
    });

    enqueLongRunningTask(ctx, session);
  }

  /**
   * if something goes wrong in the task, it simply causes timeout condition that causes the async context listener to be invoked (after the fact)
   * <p/>
   * if the {@link AsyncContext#getResponse()} is null, that means this context has already timed out (and context listener has been invoked).
   */
  private void enqueLongRunningTask(final AsyncContext ctx, final HttpSession session) {

    exec.execute(new Runnable() {

      @Override
      public void run() {

        String some_big_data = getSomeBigData();

        try {

          ServletResponse response = ctx.getResponse();
          if (response != null) {
            response.getWriter().write(some_big_data);
            ctx.complete();
          } else {
            throw new IllegalStateException(); // this is caught below
          }
        } catch (IllegalStateException ex) {
          logger.error("Request object from context is null! (nothing to worry about.)"); // just means the context was already timeout, timeout listener already called.
        } catch (Exception e) {
          logger.error("ERROR IN AsyncServlet", e);
        }
      }
    });
  }

  /** destroy the executor */
  @Override
  public void destroy() {

    exec.shutdown();
  }
}
于 2012-08-28T11:17:05.310 に答える
10

このトピックに関する私の調査中に、このスレッドがポップアップし続けたので、ここで言及することにしました。

Servlet 3.1 では、 と で非同期操作が導入されましServletInputStreamServletOutputStream。を参照してくださいServletOutputStream.setWriteListener

例はhttp://docs.oracle.com/javaee/7/tutorial/servlets013.htmにあります。

于 2013-09-07T11:13:33.537 に答える
3

これは役に立つかもしれません

http://www.oracle.com/webfolder/technetwork/tutorials/obe/java/async-servlet/async-servlets.html

于 2012-09-25T23:18:15.207 に答える
3

書き込みを非同期にすることはできません。現実的には、クライアントに何かを書き出すときは、すぐに書き出せることを期待し、そうでない場合はエラーとして扱うことができるという制限に対処する必要があります。つまり、データをできるだけ速くクライアントにストリーミングし、フローを制御する方法としてチャネルのブロック/非ブロック ステータスを使用することが目標である場合は、うまくいきません。しかし、クライアントが処理できる低レートでデータを送信している場合、少なくとも、十分に速く読み取れないクライアントを即座に切断することができます。

たとえば、あなたのアプリケーションでは、キープアライブを遅い速度 (数秒ごと) で送信し、クライアントが送信されるすべてのイベントに対応できることを期待しています。クライアントにデータを散財し、データが追いつかない場合は、迅速かつクリーンに切断できます。これは、真の非同期 I/O よりも少し制限されていますが、あなたのニーズ (そして私のもの) を満たすはずです。

トリックは、IOExceptions をスローするだけの出力を書き出すためのすべてのメソッドが実際にはそれ以上のことを行うことです: 実装では、interrupt() できるものへのすべての呼び出しは、次のようなものでラップされます (桟橋 9):

catch (InterruptedException x)
    throw (IOException)new InterruptedIOException().initCause(x);

(これは、InterruptedException がログに記録され、ブロッキング ループがすぐに再試行される Jetty 8 では発生しないことにも注意してください。おそらく、このトリックを使用するために、サーブレット コンテナーの動作が適切であることを確認する必要があります。)

つまり、遅いクライアントによって書き込みスレッドがブロックされた場合、スレッドで interrupt() を呼び出して、書き込みを強制的に IOException としてスローするだけです。考えてみてください: ノンブロッキング コードは処理スレッドの 1 つで実行するのに単位時間を消費するため、(たとえば 1 ミリ秒後に) 中断されたブロッキング書き込みを使用することは、原則としてまったく同じです。私たちはまだ、わずかに効率が悪いだけで、スレッドで短い時間を噛み砕いているだけです。

書き込みを開始する直前にメイン タイマー スレッドがジョブを実行して各書き込みの時間を制限し、書き込みがすぐに完了するとジョブがキャンセルされるように、コードを変更しました。

最後の注意: 適切に実装されたサーブレット コンテナーでは、I/O をスローしても安全なはずです。InterruptedIOException をキャッチして、後で書き込みを再試行できればよいのですが。おそらく、低速のクライアントが完全なストリームについていけない場合に、イベントのサブセットを提供したいと考えています。私が知る限り、Jetty ではこれは完全に安全というわけではありません。書き込みがスローされた場合、HttpResponse オブジェクトの内部状態は、後で書き込みを安全に再開できるほど一貫していない可能性があります。この保証を提供していない特定のドキュメントがない限り、この方法でサーブレットコンテナーをプッシュしようとするのは賢明ではないと思います。IOException が発生した場合に接続がシャットダウンされるように設計されているという考えだと思います。

RunJob::run() の変更されたバージョンを使用したコードを次に示します (実際には、ここでは、書き込みごとに 1 つの愚かなスピンアップするのではなく、メイン タイマー スレッドを使用する必要があります)。

public void run()
{
    String message = null;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id) {
            this.id = HugeStreamWithThreads.this.id;
            message = HugeStreamWithThreads.this.message;
        }
    }
    if(message == null)
        message = ":keep-alive\n\n";
    else
        message = formatMessage(message);

    final Thread curr = Thread.currentThread();
    Thread canceller = new Thread(new Runnable() {
        public void run()
        {
            try {
                Thread.sleep(2000);
                curr.interrupt();
            }
            catch(InterruptedException e) {
                // exit
            }
        }
    });
    canceller.start();

    try {
        if(!sendMessage(message))
            return;
    } finally {
        canceller.interrupt();
        while (true) {
            try { canceller.join(); break; }
            catch (InterruptedException e) { }
        }
    }

    boolean once_again = false;
    synchronized(HugeStreamWithThreads.this) {
        if(this.id != HugeStreamWithThreads.this.id)
            once_again = true;
    }
    if(once_again)
        pool.submit(this);

}
于 2013-04-30T23:49:32.820 に答える
2

春はあなたのためのオプションですか?Spring-MVC 3.2 には というクラスがありDeferredResult、「10,000 の開いている接続/10 のサーバー プール スレッド」シナリオを適切に処理します。

例: http://blog.springsource.org/2012/05/06/spring-mvc-3-2-preview-introducing-servlet-3-async-support/

于 2012-08-30T16:32:10.083 に答える
-1

あなたのリスティングをざっと見たので、見落としがあるかもしれません。プール スレッドの利点は、時間をかけて複数のタスク間でスレッド リソースを共有できることです。おそらく、すべての HTTP 接続を同時にグループ化するのではなく、異なる HTTP 接続の keepAlive 応答を間隔を空けて配置することで、問題を解決できる場合があります。

于 2013-02-28T22:23:45.057 に答える