1

UDP ポートでデータをリッスンするサーバーがあり、netty を使用しています。私のサーバーには、独自のフレーム デコーダー、実行ハンドラー、およびビジネス固有のハンドラーがあります。私のテスト クライアントは Java DatagramSocket (netty を使用しない) で作成され、応答をリッスンする前にデータを送信するだけです。以下の例を参照してください。

私の最初のバージョンでは、サーバーに返信を返すことはありませんでしたが、問題なくリクエストを繰り返し送信する何千もの同時クライアントを実行できました。私のサーバーは、これらの数十万件のリクエストを完璧に処理していました。

次に、サーバー側で受け取ったリクエストごとに、ビジネス固有のハンドラーに返信を追加する必要がありました。netty 側で提供されている例、つまりイベント バッファーにデータを書き込むだけでした。クライアント側では、データを送信して応答を受信したのと同じ udp ソケットを再利用しました。このメカニズムは、1 つのクライアントが何百もの連続した要求をサーバーに送信し、サーバーがそれぞれの要求に適切に応答した場合に非常にうまく機能しました。ただし、(ソケットを閉じるよりも) クライアントがシャットダウンした瞬間に、サーバーが停止して他の要求を受け入れることができませんでした。そのプロセスを再度実行するには、サーバーを再起動する必要がありました。もちろん、同時に複数のクライアントを実行している場合は、1 つだけが適切に機能し、1 回だけしか機能しません。

Wireshark を使用して、送受信したデータを分析しました。すべてのデータがサーバーに適切に送信され、サーバーはクライアントの最初の実行時に適切に応答を返します。しかし、クライアントを停止して再実行しようとすると、サーバーはデータの処理を停止します。Wireshark を使用すると、データが実際にサーバーに送信されていることがわかります。データの処理を停止するのは実際にはサーバー自体です。

誰かがその例で私が間違っていたことを教えてもらえますか?

これが私のクライアントの単純なバージョンです:

public class UDPClientTester
{

    public void sendAndReceiveClientTester() throws IOException
    {
        final InetAddress lIpAddress = InetAddress.getByName( "localhost" );
        int lPort = 50000;
        int lNbrRepeat = 1;

        byte[] lDataToSendAsBytes = {1,2,3,4,5,6,7,8,9,0,1,2,3,4,5,6,7,8,9,0}; //new byte[20];
        byte[] lReceivedData = new byte[22];

        final DatagramSocket udpSocket = new DatagramSocket();

        DatagramPacket lSendPacket = new DatagramPacket( lDataToSendAsBytes, lDataToSendAsBytes.length, lIpAddress, lPort );
        DatagramPacket lReceivePacket = new DatagramPacket( lReceivedData, 22 );

        // No metter the number of repeats, the server will accept all requests and return a reply for each one.
        for ( byte k = 0; k < lNbrRepeat; k++ )
        {
            // Send data...
            udpSocket.send( lSendPacket );

            // Receive response... Block here until a response is received.
            udpSocket.receive( lReceivePacket );            
        }

        udpSocket.close();

        // At the moment I close the socket, the server stop to accept anymore requests. 
        // I cannot run this example again until I restart the server!
    }
}

そして、これは私のサーバーの単純なバージョンで、main() とハンドラーのみを示しています (デコーダーは表示されていません):

public class TesterChannelHandler extends SimpleChannelHandler
{
    public  void main(final String[] args) throws Exception
    {
       DatagramChannelFactory f = new NioDatagramChannelFactory( Executors.newCachedThreadPool() );

       ConnectionlessBootstrap b = new ConnectionlessBootstrap(f);

       // Configure the pipeline factory.
       b.setPipelineFactory(new ChannelPipelineFactory() {

           ExecutionHandler executionHandler = new ExecutionHandler( new OrderedMemoryAwareThreadPoolExecutor(16, 1048576, 1048576) )

           public ChannelPipeline getPipeline() throws Exception {
              return Channels.pipeline(
                      new OuterFrameDecoder( null ),
                      executionHandler,
                      new TesterChannelHandler() );
          }
      });

      // Enable broadcast
      b.setOption( "broadcast", "false" );
      b.setOption( "receiveBufferSizePredictorFactory",
                   new FixedReceiveBufferSizePredictorFactory(1024) );

      // Bind to the port and start the service.
      b.bind( new InetSocketAddress(50000) );

    }


    public void messageReceived(final ChannelHandlerContext ctx, final MessageEvent e)
    {            
        Channel lChannel = e.getChannel();
        SocketAddress lRemoteAddress = e.getRemoteAddress();

        byte[] replyFrame = {0,9,8,7,6,5,4,3,2,1};

        ChannelBuffer lReceivedBuffer = (ChannelBuffer) e.getMessage();
        ChannelBuffer lReplyBuffer = ChannelBuffers.wrappedBuffer( replyFrame );

        if ( lChannel != null && lChannel.isWritable()  && lRemoteAddress != null )
        {
            // OK, I FOUND THE ERROR!! I DO NOT NEED TO CONNECT THE CHANNEL! I MUST USE THE REMOTE SOCKET ADDRESS ATTACHED TO THE EVENT AND PASS IT TO THE WRITE METHOD BELOW.
            if ( !e.getChannel().isConnected() ) e.getChannel().connect( e.getRemoteAddress() );

            // BELOW I SHOULD PASS THE REMOTE SOCKET ADDRESS ATTACHED TO THE EVENT AS THE SECOND ARGUMENT OF THE WRITE METHOD.
            e.getChannel().write( lReplyBuffer );
        }
    }
}
4

1 に答える 1