2

簡単な例として、nettyで2つのワーカースレッドのみを使用して3つの同時TCPクライアント接続を処理したいとします。どうすればよいですか?

質問A)以下のコードでは、3番目の接続はサーバーからデータを取得しません。接続はそこにあるだけです。注意-私のワーカーエグゼキュータとワーカー数は2です。したがって、2つのワーカースレッドと3つの接続がある場合、3つの接続すべてに2つのスレッドを提供するべきではありませんか?

B)別の質問は-nettyはjava.util.concurrentのCompletionServiceを使用していますか?使っていないようです。また、executor.submitまたはfuture.getを実行するソースコードは見当たりませんでした。これにより、ワーカースレッドよりも多い接続へのデータの処理と提供方法の混乱が増しました。

C)nettyが10000以上の同時TCP接続を処理する方法に迷いました.... 10000スレッドを作成しますか?接続ごとのスレッドはスケーラブルなソリューションではないため、テストコードが期待どおりに機能しないため、混乱しています。

    import java.net.InetSocketAddress;
    import java.nio.channels.ClosedChannelException;
    import java.util.Date;
    import java.util.concurrent.Executors;
    import java.util.logging.Level;
    import java.util.logging.Logger;

    import org.jboss.netty.bootstrap.ServerBootstrap;
    import org.jboss.netty.channel.Channel;
    import org.jboss.netty.channel.ChannelFuture;
    import org.jboss.netty.channel.ChannelFutureListener;
    import org.jboss.netty.channel.ChannelHandlerContext;
    import org.jboss.netty.channel.ChannelPipeline;
    import org.jboss.netty.channel.ChannelPipelineFactory;
    import org.jboss.netty.channel.ChannelStateEvent;
    import org.jboss.netty.channel.Channels;
    import org.jboss.netty.channel.ExceptionEvent;
    import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
    import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
    import org.jboss.netty.handler.codec.string.StringEncoder;

    public class SRNGServer {

      public static void main(String[] args) throws Exception {
          // Configure the server.
          ServerBootstrap bootstrap = new ServerBootstrap(
                  new NioServerSocketChannelFactory(
                          Executors.newCachedThreadPool(),
                          //Executors.newCachedThreadPool()
                          Executors.newFixedThreadPool(2),2
                          ));

          // Configure the pipeline factory.
          bootstrap.setPipelineFactory(new SRNGServerPipelineFactoryP());

          // Bind and start to accept incoming connections.
          bootstrap.bind(new InetSocketAddress(8080));
      }



      private static class SRNGServerHandlerP extends SimpleChannelUpstreamHandler {

        private static final Logger logger = Logger.getLogger(SRNGServerHandlerP.class.getName());


        @Override
        public void channelConnected(
                ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {

          // Send greeting for a new connection.
          Channel ch=e.getChannel();

          System.out.printf("channelConnected with channel=[%s]%n", ch);

          ChannelFuture writeFuture=e.getChannel().write("It is " + new Date() + " now.\r\n");

          SRNGChannelFutureListener srngcfl=new SRNGChannelFutureListener();

          System.out.printf("Registered listener=[%s] for future=[%s]%n", srngcfl, writeFuture);

          writeFuture.addListener(srngcfl);      

        }

        @Override
        public void exceptionCaught(
                ChannelHandlerContext ctx, ExceptionEvent e) {

            logger.log(
                    Level.WARNING,
                    "Unexpected exception from downstream.",
                    e.getCause());
            if(e.getCause() instanceof ClosedChannelException){
              logger.log(Level.INFO, "****** Connection closed by client - Closing Channel");
            }
            e.getChannel().close();
        }
      }



      private static class SRNGServerPipelineFactoryP implements ChannelPipelineFactory {

        public ChannelPipeline getPipeline() throws Exception {

            // Create a default pipeline implementation.
            ChannelPipeline pipeline = Channels.pipeline();

            pipeline.addLast("encoder", new StringEncoder());
            pipeline.addLast("handler", new SRNGServerHandlerP());

            return pipeline;
        }
      }


      private static class SRNGChannelFutureListener implements ChannelFutureListener{

        public void operationComplete(ChannelFuture future) throws InterruptedException{
          Thread.sleep(1000*5);
          Channel ch=future.getChannel();
          if(ch!=null && ch.isConnected()){
              ChannelFuture writeFuture=ch.write("It is " + new Date() + " now.\r\n");
              //-- Add this instance as listener itself.
              writeFuture.addListener(this);
          }

        }

      }
    }
4

1 に答える 1

4

私はあなたのソースコードを詳細に分析していないので、なぜそれが正しく機能しないのか正確にはわかりません。しかし、この行SRNGChannelFutureListenerは疑わしいようです。

Thread.sleep(1000*5);

これにより、それを実行するスレッドが5秒間ロックされます。その間、スレッドは他の処理を実行できなくなります。

質問C:いいえ、10,000スレッドは作成されません。Nettyの要点は、それが実際にうまくスケーリングされないため、それを行わないということです。代わりに、スレッドプールから限られた数のスレッドを使用し、何かが発生するたびにイベントを生成し、プール内のスレッドでイベントハンドラーを実行します。したがって、スレッドと接続は互いに分離されます(接続ごとにスレッドはありません)。

このメカニズムを適切に機能させるには、イベントハンドラーができるだけ早く戻り、実行されているスレッドが次のイベントハンドラーをできるだけ早く実行できるようにする必要があります。スレッドを5秒間スリープさせると、スレッドは割り当てられたままになるため、他のイベントの処理には使用できなくなります。

質問B:本当に知りたい場合は、Nettyにソースコードを入手して調べることができます。非同期I/Oを実行するためにセレクターやその他のjava.nioクラスを使用します。

于 2011-07-23T06:44:17.860 に答える