Netty を UDP で動作させるのに問題があります。最大の問題は、サーバーに接続して、サーバーとクライアント間の対話を完了すると、サーバーが役に立たなくなることです。同じクライアントまたは他の(異なるホスト)から他の接続を確立できません。私が見逃しているのは、本当にシンプルで簡単なものだと思います。次のコードを使用して、サーバーに接続する新しいホストごとに新しいパイプラインを作成するようにサーバーを構成しました (と思いますか?)。
public class DistinctChannelPipelineFactory implements ChannelPipelineFactory {
private final ChannelPipelineFactory pipelineFactory;
public DistinctChannelPipelineFactory(ChannelPipelineFactory pipelineFactory) {
this.pipelineFactory = pipelineFactory;
}
@Override public ChannelPipeline getPipeline() throws Exception {
return Channels.pipeline(new DistinctChannelPipelineHandler(pipelineFactory));
}
}
DistinctChannelPipelineHandler でこれを見て、リモート ホストごとに異なるパイプラインを作成し、10 秒後にそれらをタイムアウトさせようとしました。
private final LoadingCache<SocketAddress, ChannelPipeline> pipelines;
public DistinctChannelPipelineHandler(ChannelPipelineFactory factory) {
this.pipelines = CacheBuilder.newBuilder()
.concurrencyLevel(1)
.expireAfterAccess(10, SECONDS)
.removalListener(new PipelineRemovalListener())
.build(new PipelineCacheLoader(factory));
}
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof MessageEvent) {
final ChannelPipeline pipeline = pipelines.get(((MessageEvent) e).getRemoteAddress());
if (!pipeline.isAttached()) {
pipeline.attach(ctx.getChannel(), ctx.getPipeline().getSink());
pipeline.sendUpstream(new UpstreamChannelStateEvent(ctx.getChannel(), OPEN, TRUE));
}
pipeline.sendUpstream(e);
}
if (e instanceof ChannelStateEvent) {
for (final ChannelPipeline pipeline : pipelines.asMap().values()) {
final ChannelStateEvent cse = (ChannelStateEvent) e;
pipeline.sendUpstream(new UpstreamChannelStateEvent(ctx.getChannel(), cse.getState(), cse.getValue()));
}
}
}
public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof MessageEvent) {
final ChannelPipeline pipeline = pipelines.get(((MessageEvent) e).getRemoteAddress());
if (!pipeline.isAttached()) {
pipeline.attach(ctx.getChannel(), ctx.getPipeline().getSink());
}
pipeline.sendDownstream(e);
} else {
ctx.sendDownstream(e);
}
}
private static final class PipelineCacheLoader extends CacheLoader<SocketAddress, ChannelPipeline> {
private final ChannelPipelineFactory factory;
public PipelineCacheLoader(ChannelPipelineFactory factory) {
this.factory = factory;
}
@Override
public ChannelPipeline load(SocketAddress key) throws Exception {
return factory.getPipeline();
}
}
private static final class PipelineRemovalListener implements RemovalListener<SocketAddress, ChannelPipeline> {
private static final Logger logger = LoggerFactory.getLogger(PipelineRemovalListener.class);
@Override
public void onRemoval(RemovalNotification<SocketAddress, ChannelPipeline> n) {
logger.info("UDP connection timed out, removing connection for {}", n.getKey());
n.getValue().sendUpstream(new UpstreamChannelStateEvent(n.getValue().getChannel(), OPEN, FALSE));
}
}
これは私がサーバーを初期化する方法です:
@Provides
public ConnectionlessBootstrap getConnectionlessBootstrap(DatagramChannelFactory channelFactory,
@LocalAddress SocketAddress localAddress,
final UdpPipelineFactory pipelineFactory) {
final ConnectionlessBootstrap bootstrap = new ConnectionlessBootstrap(channelFactory);
bootstrap.setOption("localAddress", localAddress);
bootstrap.setPipelineFactory(new DistinctChannelPipelineFactory(pipelineFactory));
return bootstrap;
}
@Provides
@Singleton
public DatagramChannelFactory getDatagramChannelFatory(@WorkerExecutor Executor worker) {
final DatagramChannelFactory channelFactory = new NioDatagramChannelFactory(worker);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override public void run() {
channelFactory.releaseExternalResources();
}
});
return channelFactory;
}
問題がどこにあるとは思わなかったので、実際にすべてのハンドラーを追加する場所を省略しました。ここで何か基本的なことが欠けていますか? タイムアウトする一意のリモート アドレスごとのパイプラインが必要です。サーバーを起動して、文字通りクライアント/サーバーのやり取りだけで機能させるのは非常にイライラします! 追加のリクエストでヒットすると、新しいパイプラインが作成されないことをデバッグで確認しました。そのため、元のパイプラインが非常に古い状態のままであるように思われるため、他のリクエストを受け付けません。考え?提案?