0

JBOSS Netty を使用して、接続されたクライアントに継続的にデータを送信しようとしています。以下の例では、クライアントが接続 (channelConnected) されるとすぐに、5 秒ごとにクライアントに時間を送信しようとします。

しかし、これは機能していません。while ループにコメントを付けた場合にのみ機能します。

    import java.net.InetAddress;
    import java.net.InetSocketAddress;
    import java.util.Date;
    import java.util.concurrent.Executors;
    import java.util.logging.Level;
    import java.util.logging.Logger;

    import org.jboss.netty.bootstrap.ServerBootstrap;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.ChannelPipeline;
    import org.jboss.netty.channel.ChannelPipelineFactory;
    import org.jboss.netty.channel.ChannelStateEvent;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.channel.ExceptionEvent;
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
    import org.jboss.netty.handler.codec.string.StringEncoder;

    public class SRNGServer {

      public static void main(String[] args) throws Exception {
          // Configure the server.
          ServerBootstrap bootstrap = new ServerBootstrap(
                  new NioServerSocketChannelFactory(
                          Executors.newCachedThreadPool(),
                          Executors.newCachedThreadPool()));

          // Configure the pipeline factory.
          bootstrap.setPipelineFactory(new SRNGServerPipelineFactoryP());

          // Bind and start to accept incoming connections.
          bootstrap.bind(new InetSocketAddress(8080));
      }



      private static class SRNGServerHandlerP extends SimpleChannelUpstreamHandler {

        private static final Logger logger = Logger.getLogger(SRNGServerHandlerP.class.getName());


        @Override
        public void channelConnected(
                ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
          // Send greeting for a new connection.
          e.getChannel().write("Welcome to " + InetAddress.getLocalHost().getHostName() + "!\r\n");

          while(true){
            e.getChannel().write("It is " + new Date() + " now.\r\n");

            Thread.sleep(1000*5);
          }
        }

        @Override
        public void exceptionCaught(
                ChannelHandlerContext ctx, ExceptionEvent e) {
            logger.log(
                    Level.WARNING,
                    "Unexpected exception from downstream.",
                    e.getCause());
            e.getChannel().close();
        }
      }



      private static class SRNGServerPipelineFactoryP implements ChannelPipelineFactory {

        public ChannelPipeline getPipeline() throws Exception {

            // Create a default pipeline implementation.
            ChannelPipeline pipeline = Channels.pipeline();

            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("handler", new SRNGServerHandlerP());

            return pipeline;
        }
      }

    }
4

3 に答える 3

3

実際、Netty のドキュメントには、ハンドラーが最終的にデッドロックする可能性があるため、決してハンドラーを待機させてはならないと記載されています。その理由は、ハンドラー メソッドが I/O スレッドによって直接呼び出されるためです。Netty の 1 つの I/O スレッドは複数の I/O 操作を連続して実行するため、操作ごとに 1 つのスレッドではありません。channelConnected メソッドでは、チャネルへの参照を使用して新しいスレッドを開始し、そのスレッドが 5 秒ごとに時間を送信するようにする必要があります。これにより、接続ごとに 1 つのスレッドが生成されます。または、1 つのスレッドで 5 秒ごとにクライアントのリストをループし、各クライアントに順番に時間を送信することもできます。とにかく、ハンドラーを呼び出すスレッドとは別のスレッドを送信に使用することが重要です。

于 2011-07-22T22:22:12.680 に答える
2

その価値について、私は解決策を考え出しました。これが実用的なコードです。時間の「書き込み」の後、私は自分のChannelFuturelistenerに未来を登録します。そして、operationCompleteから、書き込みごとに新しいfutureを登録し続けます。これは、余分なスレッドを使用せずに、私が達成したいことに対して機能します。

    import java.net.InetSocketAddress;
    import java.nio.channels.ClosedChannelException;
    import java.util.Date;
    import java.util.concurrent.Executors;
    import java.util.logging.Level;
    import java.util.logging.Logger;

    import org.jboss.netty.bootstrap.ServerBootstrap;
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelFuture;
    import org.jboss.netty.channel.ChannelFutureListener;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.ChannelPipeline;
    import org.jboss.netty.channel.ChannelPipelineFactory;
    import org.jboss.netty.channel.ChannelStateEvent;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.channel.ExceptionEvent;
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
    import org.jboss.netty.handler.codec.string.StringEncoder;

    public class SRNGServer {

      public static void main(String[] args) throws Exception {
          // Configure the server.
          ServerBootstrap bootstrap = new ServerBootstrap(
                  new NioServerSocketChannelFactory(
                          Executors.newCachedThreadPool(),
                          //Executors.newCachedThreadPool()
                          Executors.newFixedThreadPool(2),2
                          ));

          // Configure the pipeline factory.
          bootstrap.setPipelineFactory(new SRNGServerPipelineFactoryP());

          // Bind and start to accept incoming connections.
          bootstrap.bind(new InetSocketAddress(8080));
      }



      private static class SRNGServerHandlerP extends SimpleChannelUpstreamHandler {

        private static final Logger logger = Logger.getLogger(SRNGServerHandlerP.class.getName());


        @Override
        public void channelConnected(
                ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

          // Send greeting for a new connection.
          Channel ch=e.getChannel();
          ChannelFuture writeFuture=e.getChannel().write("It is " + new Date() + " now.\r\n");

          SRNGChannelFutureListener srngcfl=new SRNGChannelFutureListener();

          writeFuture.addListener(srngcfl);      

        }

        @Override
        public void exceptionCaught(
                ChannelHandlerContext ctx, ExceptionEvent e) {

            logger.log(
                    Level.WARNING,
                    "Unexpected exception from downstream.",
                    e.getCause());
            if(e.getCause() instanceof ClosedChannelException){
              logger.log(Level.INFO, "****** Connection closed by client - Closing Channel");
            }
            e.getChannel().close();
        }
      }



      private static class SRNGServerPipelineFactoryP implements ChannelPipelineFactory {

        public ChannelPipeline getPipeline() throws Exception {

            // Create a default pipeline implementation.
            ChannelPipeline pipeline = Channels.pipeline();

            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("handler", new SRNGServerHandlerP());

            return pipeline;
        }
      }


      private static class SRNGChannelFutureListener implements ChannelFutureListener{

        public void operationComplete(ChannelFuture future) throws InterruptedException{
          Thread.sleep(1000*5);
          Channel ch=future.getChannel();
          if(ch!=null && ch.isConnected()){
              ChannelFuture writeFuture=ch.write("It is " + new Date() + " now.\r\n");
              //-- Add this instance as listener itself.
              writeFuture.addListener(this);
          }

        }

      }
    }
于 2011-07-22T22:29:33.190 に答える
0

スリープの結果として I/O スレッドがブロックされているようです。代わりに 2 つのワーカー スレッドを使用してみてください。

ServerBootstrap bootstrap = new ServerBootstrap(
    new NioServerSocketChannelFactory( Executors.newCachedThreadPool(),
        Executors.newCachedThreadPool(), 2 ) );
于 2011-07-22T21:15:48.080 に答える