前言
这一节主要通过跟踪write方法来看看outbound事件传播的顺序、流程。如果对事件传播节点的双向链表不熟悉,或对事件添加的顺序不熟悉,建议复习下我博客的相关篇章。
Netty Version:4.1.6
实验代码
Server.java
import com.imooc.netty.ch3.ServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;
public final class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new ServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
OutBoundHandlerB.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.util.concurrent.TimeUnit;
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerB: " + msg);
ctx.write(msg, promise);
}
@Override
public void handlerAdded(final ChannelHandlerContext ctx) {
// 定时任务
ctx.executor().schedule(() -> {
ctx.channel().write("hello world");
}, 3, TimeUnit.SECONDS);
}
}
OutBoundHandlerA.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerA: " + msg);
ctx.write(msg, promise);
}
}
OutBoundHandlerC
同OutBoundHandlerA
,最后启动Server.java的main方法。
跟进源码
再跟进之前,你可能会问ctx.executor().schedule()是个啥玩意儿?其实这是Netty提供的添加定时任务的接口,在【runAllTasks】这篇有简单跟踪过,忘记/有兴趣的可以去瞧一瞧。
启动Server.java后,用telnet连接,断点就可以进入到OutBoundHandlerB的handlerAdded方法了,至于怎么进入handlerAdded的,可以参考【添加handler】。
现在,就开始追ctx.channel().write的write方法了,先跟进去瞧瞧:
io.netty.channel.AbstractChannel#write(java.lang.Object)
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
- 上一节的fireChannelRead是直接通过pipeline调用的,而在这里我是通过channel().write()简介调用的pipeline的,因为这种方式更为推荐、常用,相信用netty写过聊天应用的应该很眼熟种调用方式。
继续跟进write方法:
io.netty.channel.DefaultChannelPipeline#write(java.lang.Object)
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
- 就如【inbound和outbound区别】所记录的一样,outbound将从tail节点开始传播。
继续跟进write方法:
io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object)
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
- newPromise()方法其实就是在获取当前绑定channel和NioEventLoop。
继续跟进write方法,此处【坐标1】:
io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
try {
if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
write(msg, false, promise);
return promise;
}
- 如果看过【上一节】,相信会觉得这个代码结构眼熟。
- 传进去的false表示不自动flush。
继续跟进write方法:
io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
- else代码块的逻辑主要是为了保证线程安全。其最终逻辑也是invokeWrite。
先跟进findContextOutbound方法:
io.netty.channel.AbstractChannelHandlerContext#findContextOutbound
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
- 取tail的前一个outbound节点返回。可见outbound事件传播顺序就是从tail传到head,恰好和inbound事件相反。
返回后跟进invokeWrite方法:
io.netty.channel.AbstractChannelHandlerContext#invokeWrite
private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
跟进invokeWrite0方法:
io.netty.channel.AbstractChannelHandlerContext#invokeWrite0
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
- 这里handler()的返回值就是OutBoundHandlerC:
- 所以接下来就会调用OutBoundHandlerC中覆写的write方法。
继续跟进write方法:
如果继续跟进这个ctx.write,就会回到【坐标1】的代码逻辑,后面会继续取前一节点传播上去,经过OutBoundHandlerB、OutBoundHandlerA的write方法后,最终传到HeadContext的write方法。
因为后续传播只是调用上面讲过的重复代码,下面略过OutBoundHandlerB、OutBoundHandlerA的传播过程,直接来到HeadContext的write方法:
io.netty.channel.DefaultChannelPipeline.HeadContext#write
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
继续跟进write方法:
io.netty.channel.AbstractChannel.AbstractUnsafe#write
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
- 到最后就是做一些收尾性质的工作。
至此,write事件的传播就告一段落了。
小结
- 本节主要通过write方法的传播来间接了解outbound事件的传播,发现outbound事件的传播是从tail传播到head的,也就是刚好和用户添加handler的顺序相反,个人认为这个顺序还是需要引起注意的。