0

Nettyフレームワークを使用してプロキシを作成していますが、最後に受信したメッセージが次のノードに渡される前に遅延する傾向があることに気づいています。

設計:

クライアント|<---------->| プロキシ|<------------>| サーバ

基本的に、問題はサーバーがメッセージを開始するときに発生します。クライアントに渡す前に遅延が発生するか、サーバーが最初のメッセージの直後に後続のメッセージを送信すると、最初のメッセージが通過し、2番目のメッセージが数秒間遅延します。なぜそうなのですか?欠落している構成パラメーターはありますか?

Startup.java

        Executor executor = Executors.newCachedThreadPool();
        ServerBootstrap sb = new ServerBootstrap(
                new NioServerSocketChannelFactory(executor, executor));

        // Set up the event pipeline factory.
        ClientSocketChannelFactory cf =
                new NioClientSocketChannelFactory(executor, executor);

        sb.setPipelineFactory(
                new ProxyPipelineFactory(cf, remoteHost, remotePort));

        sb.setOption("child.tcpNoDelay", true);
        sb.setOption("child.keepAlive", true);

        // Start up the server.
        sb.bind(new InetSocketAddress(localPort));

ProxyPipelineFactory.java

@Override
public ChannelPipeline getPipeline() throws Exception {
    ChannelPipeline p = pipeline(); 
    p.addLast("handler", new ClientHandler(cf, remoteHost, remotePort));
    return p;
}

ClientHandler.java

@Override
public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e)
        throws Exception {
    // Suspend incoming traffic until connected to the remote host.
    final Channel inboundChannel = e.getChannel();
    inboundChannel.setReadable(false);

    // Start the connection attempt.
    ClientBootstrap cb = new ClientBootstrap(cf);

        cb.setOption("child.tcpNoDelay", true);
        cb.setOption("child.keepAlive", true);
    ChannelPipeline p = cb.getPipeline();
    p.addLast("famer", new DelimiterBasedFrameDecoder(8192, false, new ChannelBuffer[]{ChannelBuffers.wrappedBuffer("</cmd>".getBytes())}));
p.addLast("handler", new ServerHandler(e.getChannel(), trafficLock));
    ChannelFuture f = cb.connect(new InetSocketAddress(remoteHost, remotePort));

    outboundChannel = f.getChannel();
    f.addListener(new ChannelFutureListener() {

        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (future.isSuccess()) {
                // Connection attempt succeeded:
                // Begin to accept incoming traffic.
                inboundChannel.setReadable(true);
            } else {
                // Close the connection if the connection attempt has failed.
                inboundChannel.close();
            }
        }
    });
}

@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
        throws Exception {

    BigEndianHeapChannelBuffer msg = (BigEndianHeapChannelBuffer) e.getMessage();

    if (log.isDebugEnabled()) {
        byte[] bytes = new byte[msg.capacity()];
        msg.readBytes(bytes);
        msg.setIndex(0, bytes.length);
        StringBuilder out = new StringBuilder("\nPROXY[ ").append(e.getChannel().getRemoteAddress()).append(" ---> Server ]");
        out.append("\nMESSAGE length=").append(bytes.length).append("\n").append(new String(bytes));
        log.debug(out.toString());
    }

    synchronized (trafficLock) {
        outboundChannel.write(msg);
        // If outboundChannel is saturated, do not read until notified in
        // OutboundHandler.channelInterestChanged().
        if (!outboundChannel.isWritable()) {
            e.getChannel().setReadable(false);
        }
    }
}

ServerHandler.java

@Override
public void messageReceived(ChannelHandlerContext ctx, final MessageEvent e)
        throws Exception {
    BigEndianHeapChannelBuffer msg = (BigEndianHeapChannelBuffer) e.getMessage();
    proxy(e.getChannel(), msg);
}
private void proxy(Channel connection, ChannelBuffer raw) {
    synchronized (trafficLock) {
        inboundChannel.write(raw);
        // If inboundChannel is saturated, do not read until notified in
        // ClientHandler.channelInterestChanged().
        if (!inboundChannel.isWritable()) {
            connection.setReadable(false);
        }
    }
}
4

1 に答える 1

0

接続の可読性を無効にしますが、さらにバイトを読み取るためにどこで再度オンにしますか?まったく同じことを行うnetty.ioのPrxoyサーバーの例を読むことができます。

private void proxy(Channel connection, ChannelBuffer raw) {
    synchronized (trafficLock) {
        inboundChannel.write(raw);
        // If inboundChannel is saturated, do not read until notified in
        // ClientHandler.channelInterestChanged().
        if (!inboundChannel.isWritable()) {
            connection.setReadable(false);
        }
    }
}
于 2012-07-27T08:58:38.850 に答える