文章目录
前言
本节就来看看Netty提供给用户开箱即用的解码器:LineBasedFrameDecoder,你也可以把它当成以换行符分割字节流的解码器。
在看之前尽量保证有一定的ByteBuf基础,如必须知道readerIndex和,writerIndex等,可以参考我前面的博客记录【ByteBuf简介】。
如果对解码器的大致流程还存在疑问,建议回去看看【ByteToMessageDecoder】,本文不会再提及整体流程,核心关注解码器的decode方法。
Netty Version:4.1.6
实验代码
为了使跟进源码时更有“体感”,就拿Netty的一个单元测试改一改作为例子:
LineBasedFrameDecoderTest.java
import io.netty.buffer.ByteBuf;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.junit.Test;
import static io.netty.buffer.Unpooled.*;
import static io.netty.util.ReferenceCountUtil.releaseLater;
import static org.hamcrest.CoreMatchers.*;
import static org.junit.Assert.*;
public class LineBasedFrameDecoderTest {
@Test
public void testDecodeWithStrip() throws Exception {
EmbeddedChannel ch = new EmbeddedChannel(new LineBasedFrameDecoder(8192, true, false));
ch.writeInbound(copiedBuffer("first\r\nsecond", CharsetUtil.US_ASCII));
assertEquals("first", releaseLater((ByteBuf) ch.readInbound()).toString(CharsetUtil.US_ASCII));
assertEquals("second", releaseLater((ByteBuf) ch.readInbound()).toString(CharsetUtil.US_ASCII));
assertNull(ch.readInbound());
ch.finish();
ReferenceCountUtil.release(ch.readInbound());
}
}
- 当然,我更建议你自己把Netty中相关单元测试的代码都跑一遍,案例非常全,但受到篇幅限制,我在这不可能全都跟一遍。
跟进源码
LineBasedFrameDecoder继承关系
先来看看LineBasedFrameDecoder的继承关系图:
- 关于图中的几个类,我在【inbound和oubound事件区别】提到过。
- ByteToMessageDecoder在【上一节】提到过。
从上面的类图,再结合之前pipeline的学习经历,不难发现,我们其实可以把LineBasedFrameDecoder当成ChannelHandler,也就是说解码其实是事件传播、处理中的其中一环。用代码表示就是类似下面这个样子:
// 略
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new Base64Decoder());
ch.pipeline().addLast(new FixedLengthFrameDecoder(3));
ch.pipeline().addLast(new LineBasedFrameDecoder(10, false, false));
}
- (上面这段话在上一节已经说过了,直接拿过来用是为了给自己强调一下)
LineBasedFrameDecoder的属性
我把笔记都写到源码里面去了,下面直接看代码注释:
/** Maximum length of a frame we're willing to decode. */
/** 数据包最大长度 */
private final int maxLength;
/** Whether or not to throw an exception as soon as we exceed maxLength. */
/** true:超过最大长度抛出异常 */
private final boolean failFast;
/** true:解析出的数据包不带换行符*/
private final boolean stripDelimiter;
/** True if we're discarding input because we're already over maxLength. */
/** 如果为true,则表示当前超过maxLength,处于丢弃输入的模式 */
/** 概念上可以理解为类似于线程池拒绝模式一样的东西 */
private boolean discarding;
/** 丢弃的字节数 */
private int discardedBytes;
- 暂时不理解也没关系,下面用到自然就懂了。
decode实现
接下来就是核心的部分了,也就是对ByteToMessageDecoder累加器传过来的数据进行解码。
找到decode方法,直接看代码:
io.netty.handler.codec.LineBasedFrameDecoder#decode(io.netty.channel.ChannelHandlerContext, io.netty.buffer.ByteBuf, java.util.List<java.lang.Object>)
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
// 解码出的对象添加到out中,交给ByteToMessageDecoder传播
out.add(decoded);
}
}
继续跟进decode方法(巨长警告),为了方便理解,我也把我理解到的逻辑写在注释里了,一些看起来“意思不太明显”的值,我也结合实验代码写到注释中去了:
io.netty.handler.codec.LineBasedFrameDecoder#decode(io.netty.channel.ChannelHandlerContext, io.netty.buffer.ByteBuf)
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 找到行结尾的指针位置,即\r\n或\n的位置
// 结合实验代码,eol=5
final int eol = findEndOfLine(buffer);
// 非丢弃模式下
if (!discarding) {
if (eol >= 0) {
// 数据包
final ByteBuf frame;
// 算出要可读指针到换行符之间的长度
// 结合实验代码,length = 5
final int length = eol - buffer.readerIndex();
// 换行符长度
// 结合实验代码,delimLength = 2
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
if (length > maxLength) {
// 把读指针指向换行符之后的一字节
buffer.readerIndex(eol + delimLength);
// 传播异常事件
fail(ctx, length);
return null;
}
// 若不需要数据包中带有换行符
if (stripDelimiter) {
// 读取数据
frame = buffer.readRetainedSlice(length);
// 读指针移动到换行符后的一个字节,意思就是抛弃换行符
buffer.skipBytes(delimLength);
} else {
// 数据包中包含换行符
frame = buffer.readRetainedSlice(length + delimLength);
}
// 返回数据包
return frame;
}
// 找不到行结尾符号
else {
final int length = buffer.readableBytes();
if (length > maxLength) {
// 计算丢弃字节数
discardedBytes = length;
// 将读指针直接移动到写指针位置,也就是抛弃所有可读数据(没有换行符且超过最大长度)
buffer.readerIndex(buffer.writerIndex());
// 开始拒绝模式(感觉设计思想类似于线程池的拒绝模式)
discarding = true;
// 是否要传播异常
if (failFast) {
// 传播异常
fail(ctx, "over " + discardedBytes);
}
}
// 啥都没解析到,返回null,等下次累加器合并
return null;
}
} else {
// 找到换行符
if (eol >= 0) {
// 丢弃字节长度累加
final int length = discardedBytes + eol - buffer.readerIndex();
final int delimLength = buffer.getByte(eol) == '\r'? 2 : 1;
// 移动读指针到换行符后,也就是丢弃
buffer.readerIndex(eol + delimLength);
discardedBytes = 0;
// 丢弃完毕,关闭丢弃模式
discarding = false;
if (!failFast) {
fail(ctx, length);
}
}
// 找不到换行符
else {
// 丢弃字节数累加
discardedBytes += buffer.readableBytes();
// 读指针跳到写指针处(没有换行符且超过最大长度)
buffer.readerIndex(buffer.writerIndex());
}
// 啥都没解析到,不需要抛出异常,返回null,等下次累加器合并
return null;
}
}
上面代码需要注意的点:
- fall方法在这里是传播异常,为什么要特地提醒一下呢?因为DelimiterBasedFrameDecoder、LengthFieldBasedFrameDecoder等解码器的fall方法都是直接抛出异常。
上面的代码配合实验代码画个简单的流程图就像下面这个样子:
- 如果stripDelimiter为false,那frame中就不包含\r\n,并且会将\r\n丢弃掉。
- 【订正】上面图中readIndex前的数据其实还没有清空的,只是被判定为无效数据而已,但实际上还存在。
上面只是一个解码成功的流程,失败、丢弃模式的流程我就不一一去分析了,说白了其实就是通过跳readerIndex来丢弃数据,有兴趣可以拿Netty提供的单元测试自己打打断点。
LineBasedFrameDecoder除了上面的解码介绍外,其实还解决了沾包和拆包的问题,关于这一块我日后有时间会查资料继续补充的。