1

アプリケーションの 1 つに NettyDecoder を実装しました

アプリケーションのプロトコルは単純で、最初の 4 文字がメッセージの長さになり、次にメッセージになります。

フレームデコーダのロジックは

import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import sun.nio.cs.StandardCharsets;

public class ITMDecoder extends FrameDecoder {


public static String bytesToStringUTFCustom(byte[] bytes) {
     char[] buffer = new char[bytes.length >> 1];
     for(int i = 0; i < buffer.length; i++) {
      int bpos = i << 1;
      char c = (char)(((bytes[bpos]&0x00FF)<<8) + (bytes[bpos+1]&0x00FF));
      buffer[i] = c;
     }
     return new String(buffer);
    }

protected Object decode(ChannelHandlerContext ctx, Channel channel,
        ChannelBuffer buf) throws Exception {

     Logger logger = LoggerFactory.getLogger(ITMDecoder.class);

    // Make sure if the length field was received.
    if (buf.readableBytes() < 4) {
        // The length field was not received yet - return null.
        // This method will be invoked again when more packets are
        // received and appended to the buffer.
        return null;
    }


    // The length field is in the buffer.

    // Mark the current buffer position before reading the length field
    // because the whole frame might not be in the buffer yet.
    // We will reset the buffer position to the marked position if
    // there's not enough bytes in the buffer.
    buf.markReaderIndex();

    // Read the length field.

    byte[] twoBytesLength = new byte[4];

        for(int i = 0 ; i < 4 ; i++)
        twoBytesLength[i] = buf.getByte(i);

    String str = new String(twoBytesLength, "UTF-8");
    Short shortValue =      Short.parseShort(str);
    int length = shortValue.intValue() + 4;
    // Make sure if there's enough bytes in the buffer.
    if (buf.readableBytes() < length) {
        // The whole bytes were not received yet - return null.
        // This method will be invoked again when more packets are
        // received and appended to the buffer.

        // Reset to the marked position to read the length field again
        // next time.
        buf.resetReaderIndex();

        return null;
    }

    // There's enough bytes in the buffer. Read it.

    ChannelBuffer frame = buf.readBytes(length);

    // Successfully decoded a frame. Return the decoded frame.
    return frame;
}

}

チャネル パイプライン ロジックは次のとおりです。

     ServerBootstrap bootstrap = new ServerBootstrap(
             new NioServerSocketChannelFactory(
         Executors.newCachedThreadPool(),
         Executors.newCachedThreadPool()));


     bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
          public ChannelPipeline getPipeline() throws Exception {
           return Channels.pipeline(
            new ITMDecoder(),
            new M3AlertHandler()
           );
          };
         });

トランザクション量が 2 tps 程度の場合は問題なく動作します。ただし、トランザクションがより高い tps で送信されると、フレーム デコーダが破損します。

2つの可変長の1つの長いメッセージを使用してSocketワークベンチで同じことを確認しました

サーバーに送信するために使用したメッセージは Message 1 =00051234500041234

同じことを 1000 回繰り返し、1 秒で送信すると、5/6 メッセージ後にデコーダーが破損しますか?

実際に正しく動作させるために欠けているものはありますか?

4

1 に答える 1

3
for(int i = 0 ; i < 4 ; i++)
    twoBytesLength[i] = buf.getByte(i);

最初のバイトのインデックスが0. 最初のバイトのインデックスはbuf.readerIndex().

for (int i = 0; i < 4; i ++) {
    twoBytesLength[i] = buf.getByte(buf.readerIndex() + i);
}

バイトバッファアクセスをさらに最適化することもできますが、それは問題の範囲ではありません。

于 2013-11-05T07:39:38.440 に答える