【Netty】decoder相关(二):行解码器LineBasedFrameDecoder

【Netty】decoder相关(二):行解码器LineBasedFrameDecoder

Scroll Down

文章目录

前言

本节就来看看Netty提供给用户开箱即用的解码器:LineBasedFrameDecoder,你也可以把它当成以换行符分割字节流的解码器。

在看之前尽量保证有一定的ByteBuf基础,如必须知道readerIndex和,writerIndex等,可以参考我前面的博客记录【ByteBuf简介】

如果对解码器的大致流程还存在疑问,建议回去看看【ByteToMessageDecoder】,本文不会再提及整体流程,核心关注解码器的decode方法。

Netty Version:4.1.6


实验代码

为了使跟进源码时更有“体感”,就拿Netty的一个单元测试改一改作为例子:
LineBasedFrameDecoderTest.java

import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.junit.Test;

import static io.netty.buffer.Unpooled.*;
import static io.netty.util.ReferenceCountUtil.releaseLater;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;

public class LineBasedFrameDecoderTest {
    @Test
    public void testDecodeWithStrip() throws Exception {
        EmbeddedChannel ch = new EmbeddedChannel(new LineBasedFrameDecoder(8192, true, false));

        ch.writeInbound(copiedBuffer("first\r\nsecond", CharsetUtil.US_ASCII));
        assertEquals("first", releaseLater((ByteBuf) ch.readInbound()).toString(CharsetUtil.US_ASCII));
        assertEquals("second", releaseLater((ByteBuf) ch.readInbound()).toString(CharsetUtil.US_ASCII));
        assertNull(ch.readInbound());
        ch.finish();

        ReferenceCountUtil.release(ch.readInbound());
    }
}
  • 当然,我更建议你自己把Netty中相关单元测试的代码都跑一遍,案例非常全,但受到篇幅限制,我在这不可能全都跟一遍。


跟进源码

LineBasedFrameDecoder继承关系

先来看看LineBasedFrameDecoder的继承关系图:

LineBasedFrameDecoder的继承关系图.png


从上面的类图,再结合之前pipeline的学习经历,不难发现,我们其实可以把LineBasedFrameDecoder当成ChannelHandler,也就是说解码其实是事件传播、处理中的其中一环。用代码表示就是类似下面这个样子:

    // 略
    .childHandler(new ChannelInitializer<SocketChannel>() {
        @Override
        public void initChannel(SocketChannel ch) {
            ch.pipeline().addLast(new Base64Decoder());
            ch.pipeline().addLast(new FixedLengthFrameDecoder(3));
            ch.pipeline().addLast(new LineBasedFrameDecoder(10, false, false));
       }
  • (上面这段话在上一节已经说过了,直接拿过来用是为了给自己强调一下)

LineBasedFrameDecoder的属性

我把笔记都写到源码里面去了,下面直接看代码注释:

    /** Maximum length of a frame we're willing to decode.  */
    /** 数据包最大长度 */
    private final int maxLength;
    /** Whether or not to throw an exception as soon as we exceed maxLength. */
    /** true:超过最大长度抛出异常 */
    private final boolean failFast;
    /** true:解析出的数据包不带换行符*/
    private final boolean stripDelimiter;

    /** True if we're discarding input because we're already over maxLength.  */
    /** 如果为true,则表示当前超过maxLength,处于丢弃输入的模式 */
    /** 概念上可以理解为类似于线程池拒绝模式一样的东西 */
    private boolean discarding;
    /** 丢弃的字节数 */
    private int discardedBytes;
  • 暂时不理解也没关系,下面用到自然就懂了。

decode实现

接下来就是核心的部分了,也就是对ByteToMessageDecoder累加器传过来的数据进行解码。

找到decode方法,直接看代码:
io.netty.handler.codec.LineBasedFrameDecoder#decode(io.netty.channel.ChannelHandlerContext, io.netty.buffer.ByteBuf, java.util.List<java.lang.Object>)

    @Override
    protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
        Object decoded = decode(ctx, in);
        if (decoded != null) {
            // 解码出的对象添加到out中,交给ByteToMessageDecoder传播
            out.add(decoded);
        }
    }

继续跟进decode方法(巨长警告),为了方便理解,我也把我理解到的逻辑写在注释里了,一些看起来“意思不太明显”的值,我也结合实验代码写到注释中去了
io.netty.handler.codec.LineBasedFrameDecoder#decode(io.netty.channel.ChannelHandlerContext, io.netty.buffer.ByteBuf)

    protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
        // 找到行结尾的指针位置,即\r\n或\n的位置
        // 结合实验代码,eol=5
        final int eol = findEndOfLine(buffer);
        // 非丢弃模式下
        if (!discarding) {
            if (eol >= 0) {
                // 数据包
                final ByteBuf frame;
                // 算出要可读指针到换行符之间的长度
                // 结合实验代码,length = 5
                final int length = eol - buffer.readerIndex();
                // 换行符长度
                // 结合实验代码,delimLength = 2
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;

                if (length > maxLength) {
                    // 把读指针指向换行符之后的一字节
                    buffer.readerIndex(eol + delimLength);
                    // 传播异常事件
                    fail(ctx, length);
                    return null;
                }

                // 若不需要数据包中带有换行符
                if (stripDelimiter) {
                    // 读取数据
                    frame = buffer.readRetainedSlice(length);
                    // 读指针移动到换行符后的一个字节,意思就是抛弃换行符
                    buffer.skipBytes(delimLength);
                } else {
                    // 数据包中包含换行符
                    frame = buffer.readRetainedSlice(length + delimLength);
                }

                // 返回数据包
                return frame;
            }
            // 找不到行结尾符号
            else {
                final int length = buffer.readableBytes();
                if (length > maxLength) {
                    // 计算丢弃字节数
                    discardedBytes = length;
                    // 将读指针直接移动到写指针位置,也就是抛弃所有可读数据(没有换行符且超过最大长度)
                    buffer.readerIndex(buffer.writerIndex());
                    // 开始拒绝模式(感觉设计思想类似于线程池的拒绝模式)
                    discarding = true;
                    // 是否要传播异常
                    if (failFast) {
                        // 传播异常
                        fail(ctx, "over " + discardedBytes);
                    }
                }
                // 啥都没解析到,返回null,等下次累加器合并
                return null;
            }
        } else {
            // 找到换行符
            if (eol >= 0) {
                // 丢弃字节长度累加
                final int length = discardedBytes + eol - buffer.readerIndex();
                final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
                // 移动读指针到换行符后,也就是丢弃
                buffer.readerIndex(eol + delimLength);
                discardedBytes = 0;
                // 丢弃完毕,关闭丢弃模式
                discarding = false;
                if (!failFast) {
                    fail(ctx, length);
                }
            }
            // 找不到换行符
            else {
                // 丢弃字节数累加
                discardedBytes += buffer.readableBytes();
                // 读指针跳到写指针处(没有换行符且超过最大长度)
                buffer.readerIndex(buffer.writerIndex());
            }
            // 啥都没解析到,不需要抛出异常,返回null,等下次累加器合并
            return null;
        }
    }

上面代码需要注意的点:

  • fall方法在这里是传播异常,为什么要特地提醒一下呢?因为DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder等解码器的fall方法都是直接抛出异常。

上面的代码配合实验代码画个简单的流程图就像下面这个样子:
Netty解码大致流程.png

  • 如果stripDelimiter为false,那frame中就不包含\r\n,并且会将\r\n丢弃掉。
  • 【订正】上面图中readIndex前的数据其实还没有清空的,只是被判定为无效数据而已,但实际上还存在。

上面只是一个解码成功的流程,失败、丢弃模式的流程我就不一一去分析了,说白了其实就是通过跳readerIndex来丢弃数据,有兴趣可以拿Netty提供的单元测试自己打打断点。

LineBasedFrameDecoder除了上面的解码介绍外,其实还解决了沾包和拆包的问题,关于这一块我日后有时间会查资料继续补充的。