文章目录
前言
本节就来看看Netty提供给用户开箱即用的解码器:DelimiterBasedFrameDecoder。
在看之前尽量保证有一定的ByteBuf基础,如必须知道readerIndex和,writerIndex等,可以参考我前面的博客记录【ByteBuf简介】。
如果对解码器的大致流程还存在疑问,建议回去看看【ByteToMessageDecoder】,本文不会再提及整体流程,核心关注解码器的decode方法。
Netty Version:4.1.6
实验代码
同样拿Netty提供的单元测试:
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
import io.netty.util.CharsetUtil;
import io.netty.util.ReferenceCountUtil;
import org.junit.Test;
import java.nio.charset.Charset;
import static io.netty.util.ReferenceCountUtil.releaseLater;
import static org.junit.Assert.*;
public class DelimiterBasedFrameDecoderTest {
@Test
public void testMultipleLinesStrippedDelimiters() {
EmbeddedChannel ch = new EmbeddedChannel(new DelimiterBasedFrameDecoder(8192, true,
delimiter()));
ch.writeInbound(Unpooled.copiedBuffer("firstgasecondga", Charset.defaultCharset()));
System.out.println(releaseLater((ByteBuf) ch.readInbound()).toString(Charset.defaultCharset()));
System.out.println(releaseLater((ByteBuf) ch.readInbound()).toString(Charset.defaultCharset()));
System.out.println(releaseLater((ByteBuf) ch.readInbound()).toString(Charset.defaultCharset()));
ch.finish();
}
// 自定义的分隔符
private static ByteBuf[] delimiter() {
return new ByteBuf[] {
Unpooled.wrappedBuffer(new byte[] { 'a' }),
Unpooled.wrappedBuffer(new byte[] { 'g' }),
};
}
}
输出结果:
first
second
- 上面first和second确实是隔开的,也就是第二次readInbound确实就是没有字符串,并不是我markdown写错了。
- 为什么第二次readInbound读到空白呢?看完本文的分析你明白了。
- 如果实验代码的"firstgasecondga"换成"firstgeeasecondga",那输出结果中间那段空白就会变成"ee"
跟进源码
DelimiterBasedFrameDecoder继承关系
先来看看DelimiterBasedFrameDecoder的继承关系图:
- 关于图中的几个类,我在【inbound和oubound事件区别】提到过。
- ByteToMessageDecoder在【上一节】提到过。
从上面的类图,再结合之前pipeline的学习经历,不难发现,我们其实可以把DelimiterBasedFrameDecoder当成ChannelHandler,也就是说解码其实是事件传播、处理中的其中一环。用代码表示就是类似下面这个样子:
// 略
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new Base64Decoder());
ch.pipeline().addLast(new DelimiterBasedFrameDecoder());
ch.pipeline().addLast(new FixedLengthFrameDecoder(3));
ch.pipeline().addLast(new LineBasedFrameDecoder(10, false, false));
}
- (上面这段话在上一节已经说过了,直接拿过来用是为了给自己强调一下)
DelimiterBasedFrameDecoder的属性
我依旧是给源码加上注释:
public class DelimiterBasedFrameDecoder extends ByteToMessageDecoder {
/** 分隔符,可多个 */
private final ByteBuf[] delimiters;
/** 数据包最大能有多长 */
private final int maxFrameLength;
/** 数据包是否需要保留分隔符 */
private final boolean stripDelimiter;
/** 超过maxFrameLength后是否抛出异常 */
private final boolean failFast;
/** 是否属于丢弃模式 */
private boolean discardingTooLongFrame;
/** 丢弃的字节数 */
private int tooLongFrameLength;
/** Set only when decoding with "\n" and "\r\n" as the delimiter. */
/** 之前就讲过这个了,行解码器 ,意思是如果分隔符仅为为\n 和 \r\n那就直接丢给行解码器(代码复用)*/
private final LineBasedFrameDecoder lineBasedDecoder;
...(非属性略)
构造方法分析
你可能会疑惑:为什么在分隔符decoder里面会有上一节讲到的LineBasedFrameDecoder?什么情况下这个LineBasedFrameDecoder!=null呢?这些问题的答案其实都在构造方法中,下面就来看一下:
io.netty.handler.codec.DelimiterBasedFrameDecoder#DelimiterBasedFrameDecoder(int, boolean, boolean, io.netty.buffer.ByteBuf...)
public DelimiterBasedFrameDecoder(
int maxFrameLength, boolean stripDelimiter, boolean failFast, ByteBuf... delimiters) {
validateMaxFrameLength(maxFrameLength);
if (delimiters == null) {
throw new NullPointerException("delimiters");
}
if (delimiters.length == 0) {
throw new IllegalArgumentException("empty delimiters");
}
// 如果是仅以换行符分割
if (isLineBased(delimiters) && !isSubclass()) {
// 创建一个换行符解码器对象
lineBasedDecoder = new LineBasedFrameDecoder(maxFrameLength, stripDelimiter, failFast);
this.delimiters = null;
} else {
this.delimiters = new ByteBuf[delimiters.length];
for (int i = 0; i < delimiters.length; i ++) {
ByteBuf d = delimiters[i];
validateDelimiter(d);
this.delimiters[i] = d.slice(d.readerIndex(), d.readableBytes());
}
// 如果分隔符不仅仅是换行符或不包括换行符,那么就置空。
lineBasedDecoder = null;
}
this.maxFrameLength = maxFrameLength;
this.stripDelimiter = stripDelimiter;
this.failFast = failFast;
}
跟进isLineBased方法,验证一下是不是想的那样:
io.netty.handler.codec.DelimiterBasedFrameDecoder#isLineBased
// 判断是否仅以换行符作为分隔符
private static boolean isLineBased(final ByteBuf[] delimiters) {
// 如果分隔符的数量小于2,那就肯定不是以换行符为分隔符(换行符包括\r\n和\n),返回false
if (delimiters.length != 2) {
return false;
}
// 取到前两个分隔符
ByteBuf a = delimiters[0];
ByteBuf b = delimiters[1];
if (a.capacity() < b.capacity()) {
a = delimiters[1];
b = delimiters[0];
}
// 判断是否是以\n && \r\n分割
return a.capacity() == 2 && b.capacity() == 1
&& a.getByte(0) == '\r' && a.getByte(1) == '\n'
&& b.getByte(0) == '\n';
}
说白就是:假设分隔符就单纯是两个换行符,那就可以直接丢给LineBasedFrameDecoder实例处理了,复用原则嘛~
decode实现
接下来就是核心的部分了,也就是对ByteToMessageDecoder累加器传过来的数据进行解码。
找到decode方法,直接看代码:
io.netty.handler.codec.DelimiterBasedFrameDecoder#decode(io.netty.channel.ChannelHandlerContext, io.netty.buffer.ByteBuf, java.util.List<java.lang.Object>)
@Override
protected final void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
Object decoded = decode(ctx, in);
if (decoded != null) {
// 解码出的对象添加到out中,交给ByteToMessageDecoder传播
out.add(decoded);
}
}
继续跟进decode方法,也是把代码做的事写到注释里面去了,下面还会画图分析:
io.netty.handler.codec.DelimiterBasedFrameDecoder#decode(io.netty.channel.ChannelHandlerContext, io.netty.buffer.ByteBuf)
protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception {
// 如果分隔符就是换行符(构造本解码器的时候就决定的)
if (lineBasedDecoder != null) {
// 直接丢给行处理器
return lineBasedDecoder.decode(ctx, buffer);
}
// Try all delimiters and choose the delimiter which yields the shortest frame.
// 用于记录最小分隔符的index
int minFrameLength = Integer.MAX_VALUE;
// 长度最小时的分割符
ByteBuf minDelim = null;
for (ByteBuf delim: delimiters) {
int frameLength = indexOf(buffer, delim);
// 计算哪个分隔符读出的数据长度最小
if (frameLength >= 0 && frameLength < minFrameLength) {
// 记录最小长度
minFrameLength = frameLength;
// 记录长度最小时的分割符
minDelim = delim;
}
}
// 以下开始解码逻辑
if (minDelim != null) {
// 返回分隔符占用的长度
int minDelimLength = minDelim.capacity();
ByteBuf frame;
// 如果属于丢弃模式
if (discardingTooLongFrame) {
// We've just finished discarding a very large frame.
// Go back to the initial state.
discardingTooLongFrame = false;
buffer.skipBytes(minFrameLength + minDelimLength);
int tooLongFrameLength = this.tooLongFrameLength;
this.tooLongFrameLength = 0;
if (!failFast) {
fail(tooLongFrameLength);
}
return null;
}
// 读到的数据包的长度 > 当前允许最大的数据包
if (minFrameLength > maxFrameLength) {
// Discard read frame.
// 读指针移动到分隔符之后
buffer.skipBytes(minFrameLength + minDelimLength);
// 抛出异常(不是异常传播)
fail(minFrameLength);
return null;
}
// 是否需要保留分隔符
if (stripDelimiter) {
// 不保留分隔符
frame = buffer.readRetainedSlice(minFrameLength);
buffer.skipBytes(minDelimLength);
} else {
// 保留分隔符
frame = buffer.readRetainedSlice(minFrameLength + minDelimLength);
}
// 返回读取到的数据包
return frame;
}
// 如果没读到分隔符
else {
// 非丢弃模式
if (!discardingTooLongFrame) {
// 数据包长度大于最大容量
if (buffer.readableBytes() > maxFrameLength) {
// Discard the content of the buffer until a delimiter is found.
tooLongFrameLength = buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
// 开启丢弃模式
discardingTooLongFrame = true;
// 是否抛出异常(不是异常传播)
if (failFast) {
fail(tooLongFrameLength);
}
}
}
else {
// 丢弃模式
// Still discarding the buffer since a delimiter is not found.
tooLongFrameLength += buffer.readableBytes();
buffer.skipBytes(buffer.readableBytes());
}
return null;
}
}
在看整体流程之前,你可能会对上面的这个代码段感到一些困惑:
// 长度最小时的分割符
ByteBuf minDelim = null;
for (ByteBuf delim: delimiters) {
int frameLength = indexOf(buffer, delim);
// 计算哪个分隔符读出的数据长度最小
if (frameLength >= 0 && frameLength < minFrameLength) {
// 记录最小长度
minFrameLength = frameLength;
// 记录长度最小时的分割符
minDelim = delim;
}
}
下面画个图来解释下:
- 说白了就是获取记录readerIndex距离最短的分隔符,以及readerIndex到此分隔符的距离。
结合实验代码,上面的decode流程大致如下:
好了,decode方法解码实验代码测试数据的大致流程就如上图,相信看完之后上文的疑惑也都能解开了。关于丢弃模式等异常情况,有兴趣的可以用Netty提供的单元测试继续跟进,这里我就不再赘述了。