0

私はNetty3.3.1-Finalで3週間働いています。私のプロトコルには3つのステップがあり、各ステップには異なるFrameDecoderが必要です。

  • 引数を読む
  • 一部のデータを転送する
  • データパイプの相互クローズ

私は理解できない多くの「ブロッキング」問題を経験してきました。org.jboss.netty.example.portunificationの例を読んで、FrameDecoderを動的に変更しようとしたときにバッファの問題が発生したように見えます。あるFrameDecoderのバッファは、次のFrameDecoderに変更するときに(おそらく)空ではありませんでした。 ..

Nettyでそれを簡単に行う方法はありますか?プロトコルを変更する必要がありますか?1つの大きなFrameDecoderを作成し、状態を管理する必要がありますか?もしそうなら、共通のサブパート(例えば「引数の読み取り」)を持つ異なるプロトコル間のコード重複を回避する方法は?

今日、FrameDecoderをホットアドおよび削除する方法を目的として、FrameDecoderUnifier(以下のコード)のアイデアを思いつきましたどう思いますか?

ご協力いただきありがとうございます!

ルノー

-----------FrameDecoderUnifierクラス--------------

    /**
     * This FrameDecoder is able to forward the unused bytes from one decoder to the next one. It provides
     * a safe way to replace a FrameDecoder inside a Pipeline.
     * It is not safe to just add and remove FrameDecoder dynamically from a Pipeline because there is a risk
     * of unread bytes inside the buffer of the FrameDecoder you wan't to remove.
     */
    public class FrameDecoderUnifier extends FrameDecoder {

        private final Method frameDecoderDecodeMethod;
        volatile boolean skip = false;
        LastFrameEventHandler eventHandler;
        LinkedList<Entry> entries;
        Entry entry = null;

        public FrameDecoderUnifier(LastFrameEventHandler eventHandler) {
            this.eventHandler = eventHandler;
            this.entries = new LinkedList<Entry>();
            try {
                this.frameDecoderDecodeMethod = FrameDecoder.class.getMethod("decode", ChannelHandlerContext.class, Channel.class, ChannelBuffer.class);
            } catch (NoSuchMethodException ex) {
                throw new RuntimeException(ex);
            } catch (SecurityException ex) {
                throw new RuntimeException(ex);
            }
        }

        public void addLast(FrameDecoder decoder, LastFrameIdentifier identifier) {
            entries.addLast(new Entry(decoder, identifier));
        }

        private Object callDecode(FrameDecoder decoder, ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
            return frameDecoderDecodeMethod.invoke(decoder, ctx, channel, buffer);
        }

        @Override
        protected Object decode(ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
            if (entry == null && !entries.isEmpty()) {
                entry = entries.getFirst();
            }

            if (entry == null) {
                return buffer; //No framing, no decoding
            }

            //Perform the decode operation
            Object obj = callDecode(entry.getDecoder(), ctx, channel, buffer);

            if (obj != null && entry.getIdentifier().isLastFrame(obj)) {
                //Fire event
                eventHandler.lastObjectDecoded(entry.getDecoder(), obj);
                entry = null;
            }
            return obj;
        }

        /**
         * You can use this interface to take some action when the current decoder is changed for the next one.
         * This can be useful to change some upper Handler in the pipeline.
         */
        public interface LastFrameEventHandler {

            public void lastObjectDecoded(FrameDecoder decoder, Object obj);
        }

        public interface LastFrameIdentifier {

            /**
             * True if after this frame, we should disable this decoder.
             * @param obj
             * @return 
             */
            public abstract boolean isLastFrame(Object decodedObj);
        }

        private class Entry {

            FrameDecoder decoder;
            LastFrameIdentifier identifier;

            public Entry(FrameDecoder decoder, LastFrameIdentifier identifier) {
                this.decoder = decoder;
                this.identifier = identifier;
            }

            public FrameDecoder getDecoder() {
                return decoder;
            }

            public LastFrameIdentifier getIdentifier() {
                return identifier;
            }
        }
}
4

2 に答える 2

2

パイプラインからフレームデコーダーを削除しても、それが呼び出されるのを妨げないように見え、デコーダーがチェーンにないかのように動作するようにする明白な方法がないという点で、同様の問題がありました: Nettyは、decode() が少なくとも 1 バイトを読み取ることを主張しているため、着信 ChannelBuffer を単純に返すことはできませんが、null を返すと、次のパケットが到着するまで着信データの処理が停止し、プロトコルのデコード プロセスが停止します。

まず、 FrameDecoderの Netty 3.7 ドキュメントには、実際には「パイプラインでデコーダーを別のデコーダーに置き換える」というセクションがあります。それは言います:

ChannelPipeline#replace() を呼び出すだけでは、これを実現することはできません。

代わりに、デコードされた最初のパケットと受信した残りのデータをラップする配列を返すことによって、データを渡すことを提案しています。

return new Object[] { firstMessage, buf.readBytes(buf.readableBytes()) };

重要なことに、これより前に「展開」が有効になっている必要がありますが、この部分は見逃しやすく、説明されていません。私が見つけた最良の手がかりは、 Netty issue 132でした。これにより、FrameDecoders で「展開」フラグが発生したことが明らかです。true の場合、デコーダーは、下流のハンドラーに対して透過的な方法で、そのような配列をオブジェクトにアンパックします。ソースコードを覗いてみると、これが「展開」の意味であることを確認しているようです。

第二に、例はデータを変更せずにパイプラインに渡す方法も示しているため、さらに簡単な方法があるようです。たとえば、私の同期パケット FrameDecoder は、ジョブを実行した後、内部フラグを設定し、パイプラインから自身を削除して、通常どおりデコードされたオブジェクトを返します。フラグが設定されている場合の後続の呼び出しは、次のようにデータを渡すだけです。

protected Object decode(ChannelHandlerContext ctx,
                        Channel channel, ChannelBuffer cbuf) throws Exception {

    // Close the door on more than one sync packet being decoded
    if (m_received) {
        // Pass on the data to the next handler in the pipeline.
        // Note we can't just return cbuf as-is, we must drain it
        // and return a new one.  Otherwise Netty will detect that
        // no bytes were read and throw an IllegalStateException.
        return cbuf.readBytes(cbuf.readableBytes());
    }

    // Handle the framing
    ChannelBuffer decoded = (ChannelBuffer) super.decode(ctx, channel, cbuf);
    if (decoded == null) {
        return null;
    }

    // Remove ourselves from the pipeline now
    ctx.getPipeline().remove(this);
    m_received = true;

    // Can we assume an array backed ChannelBuffer?
    // I have only hints that we can't, so let's copy the bytes out.
    byte[] sequence = new byte[magicSequence.length];
    decoded.readBytes(sequence);

    // We got the magic sequence?  Return the appropriate SyncMsg
    return new SyncMsg(Arrays.equals(sequence, magicSequence));
}

LengthFieldBasedFrameDecoder から派生したデコーダーは下流に残り、後続のすべてのデータ フレーミングを処理します。これまでのところ、私のために働いています。

于 2013-11-06T16:42:56.140 に答える
1

ある状態に基づいて内部デコーダーを切り替えたり、上位層ハンドラーを動的に追加/削除したりするフレームデコーダーは避けるべきだと思います。

  • コードの理解/デバッグが難しい
  • ハンドラーには明確に定義された責任がありません(そのため、ハンドラーを正しく削除/追加していますか?1つのハンドラーが1つ以上の(関連する)タイプのプロトコルメッセージを処理する必要があります。同じタイプのメッセージを処理するハンドラーは多くありません)
  • 理想的には、フレームデコーダーはプロトコルフレームのみを抽出し、状態に基づいてフレームをデコードしません(ここで、フレームデコーダーは、フレームをデコードし、デコードされたメッセージでMessageEventを起動するデコーダーの内部チェーンを持つことができ、上記のハンドラーはデコードされたメッセージに反応できます)。

更新:ここでは、各メッセージが一意のタグ/識別子を持つことができ、メッセージの終わりが明確にマークされているプロトコルを検討しました(たとえば、タグ長値フレーム形式)

于 2012-04-23T16:11:42.080 に答える