3

私のパイプラインファクトリー

private final ExecutionHandler exeHandler = new ExecutionHandler(new MemoryAwareThreadPoolExecutor (1, 0, 0, 20, TimeUnit.SECONDS, Executors.defaultThreadFactory()));

public ChannelPipeline getPipeline() throws Exception {
  ChannelPipeline pipeline = Channels.pipeline();
  pipeline.addLast("encoder", new StringEncoder(CharsetUtil.UTF_8));
  pipeline.addLast("FixedLengthStringDecoder",new FixedLengthStringDecoder(4,CharsetUtil.UTF_8));
  pipeline.addLast("decode", new StringDecoder(CharsetUtil.UTF_8)); 
  pipeline.addLast("execution",exeHandler);
  pipeline.addLast("process", this.getHandler());
  return pipeline;
}

私のハンドラー

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    //i write constructedMessage to the client;
    e.getChannel().write(constructMessage(e.getMessage().toString()));
}

問題は NioWorker にあります:

  private boolean read(SelectionKey k) {
    ...
    if (readBytes > 0) {
        bb.flip();

        final ChannelBufferFactory bufferFactory =
            channel.getConfig().getBufferFactory();
        final ChannelBuffer buffer = bufferFactory.getBuffer(readBytes);
        buffer.setBytes(0, bb);
        buffer.writerIndex(readBytes);

        recvBufferPool.release(bb);

        // Update the predictor.
        predictor.previousReceiveBufferSize(readBytes);

        // Fire the event.
        fireMessageReceived(channel, buffer);
    } else {
        recvBufferPool.release(bb);
    }

    if (ret < 0 || failure) {
        k.cancel(); 
        close(channel, succeededFuture(channel));  //here close 
        return false;
    }

fireMessageReceived(channel, buffer) は新しいスレッドで実行されますが、メインの nioworker スレッドがチャネルを閉じるため、 messageReceived() メソッドでクライアントにメッセージを書き込むと、

java.nio.channels.ClosedChannelException
at org.jboss.netty.channel.socket.nio.NioWorker.cleanUpWriteBuffer(NioWorker.java:645)
at org.jboss.netty.channel.socket.nio.NioWorker.writeFromUserCode(NioWorker.java:372)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.handleAcceptedSocket(NioServerSocketPipelineSink.java:137)
at org.jboss.netty.channel.socket.nio.NioServerSocketPipelineSink.eventSunk(NioServerSocketPipelineSink.java:76)
at org.jboss.netty.channel.Channels.write(Channels.java:632)
at org.jboss.netty.handler.codec.oneone.OneToOneEncoder.handleDownstream(OneToOneEncoder.java:70)
at org.jboss.netty.handler.execution.ExecutionHandler.handleDownstream(ExecutionHandler.java:167)

何か間違えている?

4

1 に答える 1

0

このように動作するようにコードを変更してみてください。

@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) {
    //i write constructedMessage to the client;
    Channel channel = e.getChannel();
    ctx.sendDownstream(new DownstreamMessageEvent(
            channel, 
            Channels.future(channel), 
            constructMessage(e.getMessage().toString()), 
            channel.getRemoteAddress())
    );    
}
于 2012-06-08T15:52:00.353 に答える