【Netty】decoder相关(三):分隔符解码器DelimiterBasedFrameDecoder

【Netty】decoder相关(三):分隔符解码器DelimiterBasedFrameDecoder

Scroll Down

文章目录

前言

本节就来看看Netty提供给用户开箱即用的解码器:DelimiterBasedFrameDecoder。

在看之前尽量保证有一定的ByteBuf基础,如必须知道readerIndex和,writerIndex等,可以参考我前面的博客记录【ByteBuf简介】

如果对解码器的大致流程还存在疑问,建议回去看看【ByteToMessageDecoder】,本文不会再提及整体流程,核心关注解码器的decode方法。

Netty Version:4.1.6


实验代码

同样拿Netty提供的单元测试:

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.junit.Test;

import java.nio.charset.Charset;

import static io.netty.util.ReferenceCountUtil.releaseLater;
import static org.junit.Assert.*;

public class DelimiterBasedFrameDecoderTest {

    @Test
    public void testMultipleLinesStrippedDelimiters() {
        EmbeddedChannel ch = new EmbeddedChannel(new DelimiterBasedFrameDecoder(8192, true,
                delimiter()));
        ch.writeInbound(Unpooled.copiedBuffer("firstgasecondga", Charset.defaultCharset()));

        System.out.println(releaseLater((ByteBuf) ch.readInbound()).toString(Charset.defaultCharset()));
        System.out.println(releaseLater((ByteBuf) ch.readInbound()).toString(Charset.defaultCharset()));
        System.out.println(releaseLater((ByteBuf) ch.readInbound()).toString(Charset.defaultCharset()));
        ch.finish();
    }

    // 自定义的分隔符
    private static ByteBuf[] delimiter() {
        return new ByteBuf[] {
                Unpooled.wrappedBuffer(new byte[] { 'a' }),
                Unpooled.wrappedBuffer(new byte[] { 'g' }),
        };
    }
}

输出结果:

first

second
  • 上面first和second确实是隔开的,也就是第二次readInbound确实就是没有字符串,并不是我markdown写错了。
  • 为什么第二次readInbound读到空白呢?看完本文的分析你明白了。
  • 如果实验代码的"firstgasecondga"换成"firstgeeasecondga",那输出结果中间那段空白就会变成"ee"

跟进源码

DelimiterBasedFrameDecoder继承关系

先来看看DelimiterBasedFrameDecoder的继承关系图:

DelimiterBasedFrameDecoder继承关系.png


从上面的类图,再结合之前pipeline的学习经历,不难发现,我们其实可以把DelimiterBasedFrameDecoder当成ChannelHandler,也就是说解码其实是事件传播、处理中的其中一环。用代码表示就是类似下面这个样子:

    // 略
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) {
            ch.pipeline().addLast(new Base64Decoder());
            ch.pipeline().addLast(new DelimiterBasedFrameDecoder());
            ch.pipeline().addLast(new FixedLengthFrameDecoder(3));
            ch.pipeline().addLast(new LineBasedFrameDecoder(10, false, false));

       }
  • (上面这段话在上一节已经说过了,直接拿过来用是为了给自己强调一下)

DelimiterBasedFrameDecoder的属性

我依旧是给源码加上注释:

public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {

    /** 分隔符,可多个 */
    private final ByteBuf[] delimiters;
    /** 数据包最大能有多长 */
    private final int maxFrameLength;
    /** 数据包是否需要保留分隔符 */
    private final boolean stripDelimiter;
    /** 超过maxFrameLength后是否抛出异常 */
    private final boolean failFast;
    /** 是否属于丢弃模式 */
    private boolean discardingTooLongFrame;
    /** 丢弃的字节数 */
    private int tooLongFrameLength;
    /** Set only when decoding with "\n" and "\r\n" as the delimiter.  */
    /** 之前就讲过这个了,行解码器 ,意思是如果分隔符仅为为\n 和 \r\n那就直接丢给行解码器(代码复用)*/
    private final LineBasedFrameDecoder lineBasedDecoder;
    ...(非属性略)

构造方法分析

你可能会疑惑:为什么在分隔符decoder里面会有上一节讲到的LineBasedFrameDecoder?什么情况下这个LineBasedFrameDecoder!=null呢?这些问题的答案其实都在构造方法中,下面就来看一下:
io.netty.handler.codec.DelimiterBasedFrameDecoder#DelimiterBasedFrameDecoder(int, boolean, boolean, io.netty.buffer.ByteBuf...)

    public DelimiterBasedFrameDecoder(
            int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
        validateMaxFrameLength(maxFrameLength);
        if (delimiters == null) {
            throw new NullPointerException("delimiters");
        }
        if (delimiters.length == 0) {
            throw new IllegalArgumentException("empty delimiters");
        }

        // 如果是仅以换行符分割
        if (isLineBased(delimiters) && !isSubclass()) {
            // 创建一个换行符解码器对象
            lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
            this.delimiters = null;
        } else {
            this.delimiters = new ByteBuf[delimiters.length];
            for (int i = 0; i < delimiters.length; i ++) {
                ByteBuf d = delimiters[i];
                validateDelimiter(d);
                this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
            }
            // 如果分隔符不仅仅是换行符或不包括换行符,那么就置空。
            lineBasedDecoder = null;
        }
        this.maxFrameLength = maxFrameLength;
        this.stripDelimiter = stripDelimiter;
        this.failFast = failFast;
    }

跟进isLineBased方法,验证一下是不是想的那样:
io.netty.handler.codec.DelimiterBasedFrameDecoder#isLineBased

    // 判断是否仅以换行符作为分隔符
    private static boolean isLineBased(final ByteBuf[] delimiters) {
        // 如果分隔符的数量小于2,那就肯定不是以换行符为分隔符(换行符包括\r\n和\n),返回false
        if (delimiters.length != 2) {
            return false;
        }
        // 取到前两个分隔符
        ByteBuf a = delimiters[0];
        ByteBuf b = delimiters[1];
        if (a.capacity() < b.capacity()) {
            a = delimiters[1];
            b = delimiters[0];
        }

        // 判断是否是以\n && \r\n分割
        return a.capacity() == 2 && b.capacity() == 1
                && a.getByte(0) == '\r' && a.getByte(1) == '\n'
                && b.getByte(0) == '\n';
    }

说白就是:假设分隔符就单纯是两个换行符,那就可以直接丢给LineBasedFrameDecoder实例处理了,复用原则嘛~


decode实现

接下来就是核心的部分了,也就是对ByteToMessageDecoder累加器传过来的数据进行解码。

找到decode方法,直接看代码:
io.netty.handler.codec.DelimiterBasedFrameDecoder#decode(io.netty.channel.ChannelHandlerContext, io.netty.buffer.ByteBuf, java.util.List<java.lang.Object>)

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            // 解码出的对象添加到out中,交给ByteToMessageDecoder传播
            out.add(decoded);
        }
    }

继续跟进decode方法,也是把代码做的事写到注释里面去了,下面还会画图分析:
io.netty.handler.codec.DelimiterBasedFrameDecoder#decode(io.netty.channel.ChannelHandlerContext, io.netty.buffer.ByteBuf)

    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        // 如果分隔符就是换行符(构造本解码器的时候就决定的)
        if (lineBasedDecoder != null) {
            // 直接丢给行处理器
            return lineBasedDecoder.decode(ctx, buffer);
        }
        // Try all delimiters and choose the delimiter which yields the shortest frame.
        // 用于记录最小分隔符的index
        int minFrameLength = Integer.MAX_VALUE;
        // 长度最小时的分割符
        ByteBuf minDelim = null;
        for (ByteBuf delim: delimiters) {
            int frameLength = indexOf(buffer, delim);
            // 计算哪个分隔符读出的数据长度最小
            if (frameLength >= 0 && frameLength < minFrameLength) {
                // 记录最小长度
                minFrameLength = frameLength;
                // 记录长度最小时的分割符
                minDelim = delim;
            }
        }

        // 以下开始解码逻辑

        if (minDelim != null) {
            // 返回分隔符占用的长度
            int minDelimLength = minDelim.capacity();
            ByteBuf frame;

            // 如果属于丢弃模式
            if (discardingTooLongFrame) {
                // We've just finished discarding a very large frame.
                // Go back to the initial state.
                discardingTooLongFrame = false;
                buffer.skipBytes(minFrameLength + minDelimLength);

                int tooLongFrameLength = this.tooLongFrameLength;
                this.tooLongFrameLength = 0;
                if (!failFast) {
                    fail(tooLongFrameLength);
                }
                return null;
            }

            // 读到的数据包的长度 > 当前允许最大的数据包
            if (minFrameLength > maxFrameLength) {
                // Discard read frame.
                // 读指针移动到分隔符之后
                buffer.skipBytes(minFrameLength + minDelimLength);
                // 抛出异常(不是异常传播)
                fail(minFrameLength);
                return null;
            }

            // 是否需要保留分隔符
            if (stripDelimiter) {
                // 不保留分隔符
                frame = buffer.readRetainedSlice(minFrameLength);
                buffer.skipBytes(minDelimLength);
            } else {
                // 保留分隔符
                frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
            }
            // 返回读取到的数据包
            return frame;
        }
        // 如果没读到分隔符
        else {
            // 非丢弃模式
            if (!discardingTooLongFrame) {
                // 数据包长度大于最大容量
                if (buffer.readableBytes() > maxFrameLength) {
                    // Discard the content of the buffer until a delimiter is found.
                    tooLongFrameLength = buffer.readableBytes();
                    buffer.skipBytes(buffer.readableBytes());
                    // 开启丢弃模式
                    discardingTooLongFrame = true;
                    // 是否抛出异常(不是异常传播)
                    if (failFast) {
                        fail(tooLongFrameLength);
                    }
                }
            }
            else {
                // 丢弃模式
                // Still discarding the buffer since a delimiter is not found.
                tooLongFrameLength += buffer.readableBytes();
                buffer.skipBytes(buffer.readableBytes());
            }
            return null;
        }
    }

在看整体流程之前,你可能会对上面的这个代码段感到一些困惑:

        // 长度最小时的分割符
        ByteBuf minDelim = null;
        for (ByteBuf delim: delimiters) {
            int frameLength = indexOf(buffer, delim);
            // 计算哪个分隔符读出的数据长度最小
            if (frameLength >= 0 && frameLength < minFrameLength) {
                // 记录最小长度
                minFrameLength = frameLength;
                // 记录长度最小时的分割符
                minDelim = delim;
            }
        }

下面画个图来解释下:
for循环流程.png

  • 说白了就是获取记录readerIndex距离最短的分隔符,以及readerIndex到此分隔符的距离。

结合实验代码,上面的decode流程大致如下:

大致流程.png

好了,decode方法解码实验代码测试数据的大致流程就如上图,相信看完之后上文的疑惑也都能解开了。关于丢弃模式等异常情况,有兴趣的可以用Netty提供的单元测试继续跟进,这里我就不再赘述了。