アプリケーションの 1 つに NettyDecoder を実装しました
アプリケーションのプロトコルは単純で、最初の 4 文字がメッセージの長さになり、次にメッセージになります。
フレームデコーダのロジックは
import java.nio.ByteBuffer;
import org.apache.commons.io.IOUtils;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import sun.nio.cs.StandardCharsets;
public class ITMDecoder extends FrameDecoder {
public static String bytesToStringUTFCustom(byte[] bytes) {
char[] buffer = new char[bytes.length >> 1];
for(int i = 0; i < buffer.length; i++) {
int bpos = i << 1;
char c = (char)(((bytes[bpos]&0x00FF)<<8) + (bytes[bpos+1]&0x00FF));
buffer[i] = c;
}
return new String(buffer);
}
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buf) throws Exception {
Logger logger = LoggerFactory.getLogger(ITMDecoder.class);
// Make sure if the length field was received.
if (buf.readableBytes() < 4) {
// The length field was not received yet - return null.
// This method will be invoked again when more packets are
// received and appended to the buffer.
return null;
}
// The length field is in the buffer.
// Mark the current buffer position before reading the length field
// because the whole frame might not be in the buffer yet.
// We will reset the buffer position to the marked position if
// there's not enough bytes in the buffer.
buf.markReaderIndex();
// Read the length field.
byte[] twoBytesLength = new byte[4];
for(int i = 0 ; i < 4 ; i++)
twoBytesLength[i] = buf.getByte(i);
String str = new String(twoBytesLength, "UTF-8");
Short shortValue = Short.parseShort(str);
int length = shortValue.intValue() + 4;
// Make sure if there's enough bytes in the buffer.
if (buf.readableBytes() < length) {
// The whole bytes were not received yet - return null.
// This method will be invoked again when more packets are
// received and appended to the buffer.
// Reset to the marked position to read the length field again
// next time.
buf.resetReaderIndex();
return null;
}
// There's enough bytes in the buffer. Read it.
ChannelBuffer frame = buf.readBytes(length);
// Successfully decoded a frame. Return the decoded frame.
return frame;
}
}
チャネル パイプライン ロジックは次のとおりです。
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(
new ITMDecoder(),
new M3AlertHandler()
);
};
});
トランザクション量が 2 tps 程度の場合は問題なく動作します。ただし、トランザクションがより高い tps で送信されると、フレーム デコーダが破損します。
2つの可変長の1つの長いメッセージを使用してSocketワークベンチで同じことを確認しました
サーバーに送信するために使用したメッセージは Message 1 =00051234500041234
同じことを 1000 回繰り返し、1 秒で送信すると、5/6 メッセージ後にデコーダーが破損しますか?
実際に正しく動作させるために欠けているものはありますか?