Nettyフレームワークを使用してプロキシを作成していますが、最後に受信したメッセージが次のノードに渡される前に遅延する傾向があることに気づいています。
設計:
クライアント|<---------->| プロキシ|<------------>| サーバ
基本的に、問題はサーバーがメッセージを開始するときに発生します。クライアントに渡す前に遅延が発生するか、サーバーが最初のメッセージの直後に後続のメッセージを送信すると、最初のメッセージが通過し、2番目のメッセージが数秒間遅延します。なぜそうなのですか?欠落している構成パラメーターはありますか?
Startup.java
Executor executor = Executors.newCachedThreadPool();
ServerBootstrap sb = new ServerBootstrap(
new NioServerSocketChannelFactory(executor, executor));
// Set up the event pipeline factory.
ClientSocketChannelFactory cf =
new NioClientSocketChannelFactory(executor, executor);
sb.setPipelineFactory(
new ProxyPipelineFactory(cf, remoteHost, remotePort));
sb.setOption("child.tcpNoDelay", true);
sb.setOption("child.keepAlive", true);
// Start up the server.
sb.bind(new InetSocketAddress(localPort));
ProxyPipelineFactory.java
@Override
public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline p = pipeline();
p.addLast("handler", new ClientHandler(cf, remoteHost, remotePort));
return p;
}
ClientHandler.java
@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
// Suspend incoming traffic until connected to the remote host.
final Channel inboundChannel = e.getChannel();
inboundChannel.setReadable(false);
// Start the connection attempt.
ClientBootstrap cb = new ClientBootstrap(cf);
cb.setOption("child.tcpNoDelay", true);
cb.setOption("child.keepAlive", true);
ChannelPipeline p = cb.getPipeline();
p.addLast("famer", new DelimiterBasedFrameDecoder(8192, false, new ChannelBuffer[]{ChannelBuffers.wrappedBuffer("</cmd>".getBytes())}));
p.addLast("handler", new ServerHandler(e.getChannel(), trafficLock));
ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));
outboundChannel = f.getChannel();
f.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
// Connection attempt succeeded:
// Begin to accept incoming traffic.
inboundChannel.setReadable(true);
} else {
// Close the connection if the connection attempt has failed.
inboundChannel.close();
}
}
});
}
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
BigEndianHeapChannelBuffer msg = (BigEndianHeapChannelBuffer) e.getMessage();
if (log.isDebugEnabled()) {
byte[] bytes = new byte[msg.capacity()];
msg.readBytes(bytes);
msg.setIndex(0, bytes.length);
StringBuilder out = new StringBuilder("\nPROXY[ ").append(e.getChannel().getRemoteAddress()).append(" ---> Server ]");
out.append("\nMESSAGE length=").append(bytes.length).append("\n").append(new String(bytes));
log.debug(out.toString());
}
synchronized (trafficLock) {
outboundChannel.write(msg);
// If outboundChannel is saturated, do not read until notified in
// OutboundHandler.channelInterestChanged().
if (!outboundChannel.isWritable()) {
e.getChannel().setReadable(false);
}
}
}
ServerHandler.java
@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
throws Exception {
BigEndianHeapChannelBuffer msg = (BigEndianHeapChannelBuffer) e.getMessage();
proxy(e.getChannel(), msg);
}
private void proxy(Channel connection, ChannelBuffer raw) {
synchronized (trafficLock) {
inboundChannel.write(raw);
// If inboundChannel is saturated, do not read until notified in
// ClientHandler.channelInterestChanged().
if (!inboundChannel.isWritable()) {
connection.setReadable(false);
}
}
}