# 前言
这一节主要通过跟踪write方法来看看outbound事件传播的顺序、流程。如果对事件传播节点的双向链表不熟悉,或对事件添加的顺序不熟悉,建议复习下我博客的相关篇章。
> Netty Version:4.1.6
<br/>
# 实验代码
<code>Server.java</code>
```java
import com.imooc.netty.ch3.ServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;
public final class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new ServerHandler())
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerB());
ch.pipeline().addLast(new OutBoundHandlerC());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
```
<code>OutBoundHandlerB.java</code>
```java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
import java.util.concurrent.TimeUnit;
public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerB: " + msg);
ctx.write(msg, promise);
}
@Override
public void handlerAdded(final ChannelHandlerContext ctx) {
// 定时任务
ctx.executor().schedule(() -> {
ctx.channel().write("hello world");
}, 3, TimeUnit.SECONDS);
}
}
```
<code>OutBoundHandlerA.java</code>
```java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;
public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundHandlerA: " + msg);
ctx.write(msg, promise);
}
}
```
<code>OutBoundHandlerC</code>同<code>OutBoundHandlerA</code>,最后启动Server.java的main方法。
<br/>
# 跟进源码
再跟进之前,你可能会问ctx.executor().schedule()是个啥玩意儿?其实这是Netty提供的添加定时任务的接口,在[【runAllTasks】](https://wenjie.store/archives/netty-nioeventloop-boot-4)这篇有简单跟踪过,忘记/有兴趣的可以去瞧一瞧。
启动Server.java后,用telnet连接,断点就可以进入到OutBoundHandlerB的handlerAdded方法了,至于怎么进入handlerAdded的,可以参考[【添加handler】](https://wenjie.store/archives/about-pipeline-2)。
现在,就开始追ctx.channel().write的write方法了,先跟进去瞧瞧:
<code>io.netty.channel.AbstractChannel#write(java.lang.Object)</code>
```java
@Override
public ChannelFuture write(Object msg) {
return pipeline.write(msg);
}
```
- [上一节](https://wenjie.store/archives/about-pipeline-5)的fireChannelRead是直接通过pipeline调用的,而在这里我是通过channel().write()简介调用的pipeline的,因为这种方式更为推荐、常用,相信用netty写过聊天应用的应该很眼熟种调用方式。
<br/>
继续跟进write方法:
<code>io.netty.channel.DefaultChannelPipeline#write(java.lang.Object)</code>
```java
@Override
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
```
- 就如[【inbound和outbound区别】](https://wenjie.store/archives/about-pipeline-4)所记录的一样,outbound将从tail节点开始传播。
继续跟进write方法:
<code>io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object)</code>
```java
@Override
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
```
- newPromise()方法其实就是在获取当前绑定channel和NioEventLoop。
继续跟进write方法,此处<span id="tag1">【坐标1】</span>:
<code>io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)</code>
```java
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}
try {
if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}
} catch (RuntimeException e) {
ReferenceCountUtil.release(msg);
throw e;
}
write(msg, false, promise);
return promise;
}
```
- 如果看过[【上一节】](https://wenjie.store/archives/about-pipeline-5),相信会觉得这个代码结构眼熟。
- 传进去的false表示不自动flush。
<br/>
继续跟进write方法:
<code>io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)</code>
```java
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}
```
- else代码块的逻辑主要是为了保证线程安全。其最终逻辑也是invokeWrite。
先跟进findContextOutbound方法:
<code>io.netty.channel.AbstractChannelHandlerContext#findContextOutbound</code>
```java
private AbstractChannelHandlerContext findContextOutbound() {
AbstractChannelHandlerContext ctx = this;
do {
ctx = ctx.prev;
} while (!ctx.outbound);
return ctx;
}
```
- 取tail的**前一个outbound节点**返回。可见outbound事件传播顺序就是从tail传到head,恰好和inbound事件相反。
返回后跟进invokeWrite方法:
<code>io.netty.channel.AbstractChannelHandlerContext#invokeWrite</code>
```java
private void invokeWrite(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
invokeWrite0(msg, promise);
} else {
write(msg, promise);
}
}
```
跟进invokeWrite0方法:
<code>io.netty.channel.AbstractChannelHandlerContext#invokeWrite0</code>
```java
private void invokeWrite0(Object msg, ChannelPromise promise) {
try {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
} catch (Throwable t) {
notifyOutboundHandlerException(t, promise);
}
}
```
- 这里handler()的返回值就是OutBoundHandlerC:
- 所以接下来就会调用OutBoundHandlerC中覆写的write方法。
<br/>
继续跟进write方法:

<br/>
如果继续跟进这个ctx.write,就会回到[【坐标1】](#tag1)的代码逻辑,后面会继续取前一节点传播上去,经过OutBoundHandlerB、OutBoundHandlerA的write方法后,最终传到HeadContext的write方法。
<br/>
因为后续传播只是调用上面讲过的重复代码,下面略过OutBoundHandlerB、OutBoundHandlerA的传播过程,直接来到HeadContext的write方法:
<code>io.netty.channel.DefaultChannelPipeline.HeadContext#write</code>
```java
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
```
继续跟进write方法:
<code>io.netty.channel.AbstractChannel.AbstractUnsafe#write</code>
```java
@Override
public final void write(Object msg, ChannelPromise promise) {
assertEventLoop();
ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
if (outboundBuffer == null) {
// If the outboundBuffer is null we know the channel was closed and so
// need to fail the future right away. If it is not null the handling of the rest
// will be done in flush0()
// See https://github.com/netty/netty/issues/2362
safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
// release message now to prevent resource-leak
ReferenceCountUtil.release(msg);
return;
}
int size;
try {
msg = filterOutboundMessage(msg);
size = pipeline.estimatorHandle().size(msg);
if (size < 0) {
size = 0;
}
} catch (Throwable t) {
safeSetFailure(promise, t);
ReferenceCountUtil.release(msg);
return;
}
outboundBuffer.addMessage(msg, size, promise);
}
```
- 到最后就是做一些收尾性质的工作。
<br/>
至此,write事件的传播就告一段落了。
---
<br/>
# 小结
- 本节主要通过write方法的传播来间接了解outbound事件的传播,发现outbound事件的传播是从tail传播到head的,也就是刚好**和用户添加handler的顺序相反**,个人认为这个顺序还是需要引起注意的。

【Netty】Pipeline相关(六):OutBound事件传播(write)