前言
在看本节之前,最好看看inbound和outbound事件的区别这一篇,先对inbound和outbound事件的区别有一个理解再看这篇会更好。并且,知道添加后的ChannelHandler的顺序也是前提,可以参考【添加handler】一节。
本节就是挑一个inbound事件传播来追踪源码,最终选定了channelRead这个方法,如果对这个方法感到陌生也没关系,因为本节主要关注的是事件传播的流程、顺序。
Netty Version:4.1.6
实验代码
Server.java
import com.imooc.netty.ch3.ServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;
public final class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new ServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new InBoundHandlerA());
ch.pipeline().addLast(new InBoundHandlerB());
ch.pipeline().addLast(new InBoundHandlerC());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
InBoundHandlerA
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
public class InBoundHandlerA extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerA: " + msg);
ctx.fireChannelRead(msg);
}
InBoundHandlerB
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
/**
* @author
*/
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
System.out.println("InBoundHandlerB: " + msg);
ctx.fireChannelRead(msg);
}
@Override
public void channelActive(ChannelHandlerContext ctx) {
// 本节入口
ctx.channel().pipeline().fireChannelRead("hello world");
}
}
InBoundHandlerC
同InBoundHandlerA
跟进源码
入口就是InBoundHandlerB的channelActive方法,此处【坐标1】:
@Override
public void channelActive(ChannelHandlerContext ctx) {
ctx.channel().pipeline().fireChannelRead("hello world");
}
最好在上述方法打个断点,启动Server.java后,再用telnet连接即可进入这段代码。至于是怎么进入这里的channelActive方法的,可以参考端口绑定这一篇,客户端处理逻辑也一样,这里就不再赘述。
跟进fireChannelRead方法,实现类是DefaultChannelPipeline,在pipeline初始化这节讲过:
io.netty.channel.DefaultChannelPipeline#fireChannelRead
@Override
public final ChannelPipeline fireChannelRead(Object msg) {
AbstractChannelHandlerContext.invokeChannelRead(head, msg);
return this;
}
- 需要注意这里传了head节点进去,这一步大概率说明事件是从head开始向tail传播的。
跟进invokeChannelRead方法,此处【坐标0.5】:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(AbstractChannelHandlerContext, Object)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
继续跟进invokeChannelRead方法:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
- handler()当前返回的就是head节点:
- 所以我们不难推测接下来就是调用HeadContext的channelRead方法。
跟进channelRead方法,此处【坐标2】:
io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
ctx.fireChannelRead(msg);
}
- 请注意这里的fireChannelRead和【坐标1】处fireChannelRead的区别。
- 【坐标1】的fireChannelRead实现是DefaultChannelPipeline,而这里的fireChannelRead实现是AbstractChannelHandlerContext。两种实现是不一样的。前者是传播到head,后者是传播到下一个节点,接下来就能看到了。
跟进fireChannelRead方法:
io.netty.channel.AbstractChannelHandlerContext#fireChannelRead
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
invokeChannelRead(findContextInbound(), msg);
return this;
}
跟进findContextInbound方法:
io.netty.channel.AbstractChannelHandlerContext#findContextInbound
private AbstractChannelHandlerContext findContextInbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.next;
} while (!ctx.inbound);
return ctx;
}
- 可以发现就是取当前节点的下一个inbound节点,这意味着可能会将事件传播到下一个节点。
返回到fireChannelRead,跟进invokeChannelRead方法:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(bstractChannelHandlerContext, Object)
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelRead(m);
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelRead(m);
}
});
}
}
- 发现这个其实就是【坐标0.5】中的代码,也就是绕了一圈又回来了,区别在于当前节点的handler变了:
再继续跟一遍invokeChannelRead方法:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)
private void invokeChannelRead(Object msg) {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelRead(this, msg);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelRead(msg);
}
}
- 很明显,接下来就是去到我们InBoundHandlerA覆写的channelRead方法了。
继续跟channelRead方法:
- 果不其然。
至于到这一步的ctx.fireChannelRead(msg),其实跟【坐标2】的后序流程是一样的。
也就是接下来会将事件传播到InBoundHandlerB的channelRead,然后InBoundHandlerB的channelRead也会调用fireChannelRead发起事件传播,然后事件传播到InBoundHandlerC,InBoundHandlerC也会继续往下传播,最终传到TailContext进行收尾工作。
后序的传播追踪我就不一一列举了,相信搞懂了上面的循环逻辑,即便不追断点,脑海里也很清楚整个传播流程了。
小结
- 本小节主要是通过跟踪channelRead的事件传播来间接的分析Inbound事件传播,验证了inbound事件传播的顺序,其实就如上一小节所说,是按照我们添加inboundhandler的顺序进行传播的。