Netty网络编程四:Netty粘包拆包解码器之ByteToMessageDecoder

时间:2024-04-08 18:34:45

一:ByteToMessageDecoder的作用

ByteToMessageDecoder在Netty中主要是用来解决半包积累的问题,是一种解码器,LineBasedFrameDecoder ,DelimiterBasedFrameDecoder,FixedLengthFrameDecoder都是其的一种具体的实现。因为要想netty解决半包拆包问题,需要从认识ByteToMessageDecoder源码开始。

二:具体实现

首先带着一种思路,当从TCP无边界的字节流数据中找出相应的完整的数据,一般的思路是:
首先,不停的读取TCP中的获取数据
1.如果当前读取的数据不足以拼接成一个完整的业务数据包,那就保留该数据,继续从tcp缓冲区中读取,直到得到一个完整的数据包
2.如果当前读到的数据加上已经读取的数据足够拼接成一个数据包,那就将已经读取的数据拼接上本次读取的数据,够成一个完整的业务数据包传递到业务逻辑,多余的数据仍然保留,以便和下次读到的数据尝试拼接。

带着这种想法,我们来看一下ByteToMessageDecoder的源码
首先,ByteToMessageDecoder是继承Handler的,所以也继承了handler的非私有属性
Netty网络编程四:Netty粘包拆包解码器之ByteToMessageDecoder
将 ByteToMessageDecoder字节流消息解码器分为三个部分讲解。
先看下部分成员变量的含义:
Netty网络编程四:Netty粘包拆包解码器之ByteToMessageDecoder
1)字节流消息的累加器
先看ByteToMessageDecoder内部定义的一个接口
Netty网络编程四:Netty粘包拆包解码器之ByteToMessageDecoder
主要作用是定义字节流缓冲累加规则。
ByteToMessageDecode定义了两种类型的数据缓冲累加机制:
这是一种内存复制的累加缓冲方式:
Netty网络编程四:Netty粘包拆包解码器之ByteToMessageDecoder
这是非内存复制方式实现缓冲累加,使用API 中CompositeByteBuf进行累加
Netty网络编程四:Netty粘包拆包解码器之ByteToMessageDecoder
通过:
Netty网络编程四:Netty粘包拆包解码器之ByteToMessageDecoder
可以看出默认使用第一种累加机制。
详细看下内存复制的累加方式:
Netty网络编程四:Netty粘包拆包解码器之ByteToMessageDecoder
内存复制方式扩充缓冲
Netty网络编程四:Netty粘包拆包解码器之ByteToMessageDecoder
即通过内存复制的方式,把待读取的缓冲累加到累加器中,并释放待读取缓冲的内存

2)字节流消息解码后存储到CodecOutputList
CodecOutputList是一个list集合,
Netty网络编程四:Netty粘包拆包解码器之ByteToMessageDecoder
ByteToMessageDecoder会使用它作为容器来存放,解码后的对象。
具体CodecOutputList实例化步骤
Netty网络编程四:Netty粘包拆包解码器之ByteToMessageDecoder
创建一个线程变量ThreadLocal里面存放CodecOutputLists数组,CodecOutputLists中的每个值存放的是CodecOutputList,所以可以理解为一个二维数据
Netty网络编程四:Netty粘包拆包解码器之ByteToMessageDecoder
分配空数组并返回一个数组集合:
Netty网络编程四:Netty粘包拆包解码器之ByteToMessageDecoder
CodecOutputList 可以理解为一个对象吃,每次channelRead()方法的时候,都会从对象池中拿出一个对象数组,用来存放从cumulation缓冲累积容器中解码出来的对象, 并把CodecOutputList对象数组的数据,传输到下一个handler处理器处理。

3)channelRead() 方法读取处理IO消息
channelRead() 方法是读取和处理IO消息的

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    if (msg instanceof ByteBuf) {
        // 1.从对象池中取出一个对象数组,用来存放cumulator缓冲积累器中解码后的对象
        CodecOutputList out = CodecOutputList.newInstance();
        try {
            //2.先积累数据
            ByteBuf data = (ByteBuf) msg;
            first = cumulation == null;
            if (first) {
                cumulation = data;
            } else {
                cumulation = cumulator.cumulate(ctx.alloc(), cumulation, data);
            }
            // 3.解码积累器中的缓冲数据,并把结果添加到out对象数组中
            callDecode(ctx, cumulation, out);
        } catch (DecoderException e) {
            throw e;
        } catch (Exception e) {
            throw new DecoderException(e);
        } finally {
            //4.释放积累器中的内存
            if (cumulation != null && !cumulation.isReadable()) {
                numReads = 0;
                cumulation.release();
                cumulation = null;
            } else if (++ numReads >= discardAfterReads) {
                // We did enough reads already try to discard some bytes so we not risk to see a OOME.
                // See https://github.com/netty/netty/issues/4275
                numReads = 0;
                discardSomeReadBytes();
            }
            //5.将out对象池中的对象,即解码成功的对象,传输到其他channel继续后续业务流程
            int size = out.size();
            decodeWasNull = !out.insertSinceRecycled();
            fireChannelRead(ctx, out, size);
            out.recycle();
        }
    } else {
        // 发送到下一个handler处理
        ctx.fireChannelRead(msg);
    }
}

解码缓冲积累器中的数据到out中

protected void callDecode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) {
    try {
        // 1、只要积累器in中有数据,就读取数据
        while (in.isReadable()) {
            int outSize = out.size();

            //2、如果 对象池中有已经解码的对象数据,则先把解码后的对象发出去
            if (outSize > 0) {
                fireChannelRead(ctx, out, outSize);
                out.clear();

                // Check if this handler was removed before continuing with decoding.
                // If it was removed, it is not safe to continue to operate on the buffer.
                //
                // See:
                // - https://github.com/netty/netty/issues/4635
                if (ctx.isRemoved()) {
                    break;
                }
                outSize = 0;
            }

            int oldInputLength = in.readableBytes();
            //3、解码数据in 到out对象数组中
            decodeRemovalReentryProtection(ctx, in, out);

            // Check if this handler was removed before continuing the loop.
            // If it was removed, it is not safe to continue to operate on the buffer.
            //
            // See https://github.com/netty/netty/issues/1664
            if (ctx.isRemoved()) {
                break;
            }

            if (outSize == out.size()) {
                if (oldInputLength == in.readableBytes()) {
                    break;
                } else {
                    continue;
                }
            }

            if (oldInputLength == in.readableBytes()) {
                throw new DecoderException(
                        StringUtil.simpleClassName(getClass()) +
                                ".decode() did not read anything but decoded a message.");
            }

            if (isSingleDecode()) {
                break;
            }
        }
    } catch (DecoderException e) {
        throw e;
    } catch (Exception cause) {
        throw new DecoderException(cause);
    }
}

解码缓冲积累器中的缓冲数据:

final void decodeRemovalReentryProtection(ChannelHandlerContext ctx, ByteBuf in, List<Object> out)
        throws Exception {
    decodeState = STATE_CALLING_CHILD_DECODE;
    try {
        // 此为抽象方法,为子类自定义实现,由此可以看出,ByteToMessageDecoder采用的是模板方法的设计模式
        decode(ctx, in, out);
    } finally {
        boolean removePending = decodeState == STATE_HANDLER_REMOVED_PENDING;
        decodeState = STATE_INIT;
        if (removePending) {
            handlerRemoved(ctx);
        }
    }
}

解码方法

protected abstract void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception;

由此可以看出,采用模板方法的设计模式,有自定义的子类来实现,根据不同情况来对 缓冲积累器中的数据进行解码,最后添加到out数组中

总结:
ByteToMessageDecoder这个类的主要作用的对TCP传输字节流进行解码,以编码TCP传输的粘包/拆包问题
具体步骤是:
1、通过channelRead()方法为入口(进行TCP消息处理的入口),将TCP中获得的消息都存放到cumulation缓冲累积器中
2、通过将缓冲累积器中的数据,进行解码处理,并将解码后的数据存放到CodecOutputList对象数组中
3、CodecOutputList对象数组中中的数据发出去,发送到下个handler进行进一步业务处理
4、解码过程由用户根据不同的编码需求自定义实现decode(ChannelHandlerContext ctx, ByteBuf in, List out)方法进行解码

理解此ByteToMessageDecoder,再来看其实现LineBasedFrameDecoder ,DelimiterBasedFrameDecoder,FixedLengthFrameDecoder发现只要实现decode()方法即可,并按照指定的分割条件进行字节流数据分割,然后将处理好的数据加入到out的集合中即可。