# 前言
这一节的很多理论知识基础都**严重依赖**[【上一节】](https://wenjie.store/archives/about-pipeline-1)讲到的**ChannelIn/OutboundInvoker和ChannelIn/OutboundHandler方法意义区别**。如果对这四个接口的区别没什么印象,建议回去重新看一遍。
本节主要来讲讲添加ChannelHandler的源码逻辑,顺便会提点之前服务端、客户端初始化遇到过的添加handler逻辑。
<blockquote><p>
Netty Version:4.1.6
</p></blockquote>
<br/>
# 大致流程
这里先不分服务端、客户端,列一下源码的大致流程:
- 判断handler是否重复添加。
- 创建结点:给handler起名字、绑定NioEventLoop、保存handler。
- 将节点添加至双向链表中,也就是HeadContext和TailContext中间。
- 发起添加事件传播(回调)。
<br/>
# 实验代码
<code>Server.java</code>
```java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;
/**
* @author cwj
*/
public final class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new ServerHandler())
.childHandler(new MyServerInitialzer());
ChannelFuture future = server.bind(8888).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
```
<code>ServerHandler</code>
```java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("handlerAdded");
}
}
```
<code>MyServerInitialzer.java</code>
```java
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
public class MyServerInitialzer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("initChannel");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter());
}
}
```
<br/>
# 跟进源码
### 服务端添加ChannelHandler
先从添加服务端ChannelHandler开始跟进,找到在[【初始化服务端channel】](https://wenjie.store/archives/netty-boot-do-something-2)一节的init方法,服务端添加handler的核心代码如下,**<span id="tag1">此处【坐标1】</span>**:
<code>io.netty.bootstrap.ServerBootstrap#init</code>
```java
...(略)
p.addLast(new ChannelInitializer<Channel>() {
// addLast后回调的方法,下面的addLast才是真正添加我们自定义的ServerHandler
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加自定义的ServerHandler
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
```
- addLast方法其实就是添加ChannelHandler的核心方法,由ChannelPipe接口抽象。
- 代码段的第一个handler并不是添加我们自定义的ServerHandler,而是先添加ChannelInitializer,之后ChannelInitializer添加完毕后会回调HandlerAdded方法 & 删除自己,而ChannelInitializer的HandlerAdded方法最终又会回调到上面代码块中的initChannel方法,在这个initChannel方法中的addLast才是真正添加自定义的ServerHandler。(现在不懂没关系,下面追源码就是这段逻辑)
<br/>
在跟进源码之前,我们不妨再来看看当前addLast方法的抽象接口ChannelPipeline的关系图:

- 可见它间接实现了两Invoker接口,说明它的方法极有可能拥有发起事件传播(回调)的能力。
<blockquote><p>
关于两Invoker和两Handler接口如果感到陌生,请看<a href="https://wenjie.store/archives/about-pipeline-1">上一节</a> 。
</p></blockquote>
<br/>
好了,现在视角重新转回上面[【坐标1】](#tag1)中的代码,来追一追p.addLast,进入这个addLast方法:
<code>io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)</code>
```java
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
```
- 注意NioEventLoop被设置为null。
继续跟进addLast方法
<code>io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, io.netty.channel.ChannelHandler...)</code>
```java
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
```
- handler的name也设置为null,到后面创建节点才真正设置。
继续跟进addLast,能看到总体逻辑,**<span id="tag2">此处【坐标2】</span>**:
<code>io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)</code>
```java
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检查此handler是否共享的、添加过。
checkMultiplicity(handler);
// 创建新节点,这一步会分配NioEventLoop、检查name是否重复、命名、保存handler等。
newCtx = newContext(group, filterName(name, handler), handler);
// 添加节点至pipeline的双向链表。
addLast0(newCtx);
// 如果是添加客户端ChannelHandler都会走这一段,添加服务端ChannelHandler则不会进入这里。
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// 下面代码就是发起handlerAdded事件传播(回调)
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
```
- 代码有注释的就是接下来要分析的方法。
<br/>
### 校验handler
先进入checkMultiplicity方法看看:
<code>io.netty.channel.DefaultChannelPipeline#checkMultiplicity</code>
```java
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
```
- 这个方法就是通过added标记判断handler是否被添加过,还有通过isSharable()方法判断handler是否有@Share注解,即判断能否被共享。
- 另外注意这里的added是boolean类型,但却没有用volatile关键字修饰,原因是后面还会有其它校验,不用担心这里会因为重排序而导致致命错误。
<br/>
### 创建节点
视角再转回[【坐标2】](#tag2)的代码,找到这一段代码,**<span id="tag3">此处【坐标3】</span>**:
```java
newCtx = newContext(group, filterName(name, handler), handler);
```
跟进filterName方法看看:
<code>io.netty.channel.DefaultChannelPipeline#filterName</code>
```java
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);
return name;
}
```
- 如果没有名字就起一个,如果有则校验一下名字有没有重复。
起名字的方法我这里的不跟了,来看看校验名字的方法,即checkDuplicateName,最终跟到如下代码:
<code>io.netty.channel.DefaultChannelPipeline#context0</code>
```java
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
```
- 这里其实就是遍历在[上一节](https://wenjie.store/archives/about-pipeline-1)提到的双向链表,因为这里是双向链表第一次用上了,所以特地跟进来看看。
<br/>
给handler取完名后,视角转回[【坐标3】](#tag3)的代码,跟进newContext,看看创建节点的过程:
<code>io.netty.channel.DefaultChannelPipeline#newContext</code>
```java
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
```
- 可以看见节点的实际类型是DefaultChannelHandlerContext。
- 这里的childExecutor方法并不是分配NioEventLoop的,因为只要传入的参数为null,它就返回null,所以最终传进DefaultChannelHandlerContext的还是null。
跟进DefaultChannelHandlerContext构造方法:
```java
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
```
- 这里的super构造方法其实就是调用[【上一节】](https://wenjie.store/archives/about-pipeline-1)提到的AbstractChannelHandlerContext构造方法,这里就不再赘述了,主要是节点中保存了handler。
<br/>
### 将节点添加至双向链表
现在节点已经创建好了,接下来就是要把节点添加到[初始化pipeline](https://wenjie.store/archives/about-pipeline-1)时创建的head和tail之间了。
视角转回[【坐标2】](#tag2)的代码,进入到addLast0方法:
<code>io.netty.channel.DefaultChannelPipeline#addLast0</code>
```java
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
```
- 将当前节点添加到tail节点的前面。
<br/>
### **发起事件传播(回调)**
节点添加已经完成了,接下来就是要发起一下事件传播(回调)了。
视角重新拉回[【坐标2】](#tag2)的代码,跟进callHandlerAdded0方法:
<code>io.netty.channel.DefaultChannelPipeline#callHandlerAdded0</code>
```java
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
...(略)
}
```
- 其中ctx.handler().handlerAdded(ctx)就是真正的回调方法,因为[【坐标1】](#tag1)中首先添加的是ChannelInitializer,所以这里handler返回的其实是ChannelInitializer的实例。
- 需要注意的是,打断点是看见handler()返回如下:
- 
- 这是因为我们在[【坐标1】](#tag1)的代码中,使用的是匿名内部类,实际上这个实例直属类就是ChannelInitializer。
另外,ChannelInitializer简介实现了ChannelInboundHandler接口,忘记这个接口的请看[【上一节】](https://wenjie.store/archives/about-pipeline-1),这个类的方法的主体功能相信不用我再赘述了。
跟进handlerAdded方法:
<code>io.netty.channel.ChannelInitializer#handlerAdded</code>
```java
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
```
跟进initChannel方法:
<code>io.netty.channel.ChannelInitializer#initChannel(io.netty.channel.ChannelHandlerContext)</code>
```java
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
```
继续跟进initChannel方法,会发现这是个抽象实现:
```java
protected abstract void initChannel(C ch) throws Exception;
```
<br/>
那么这个抽象实现实际上是回调到谁呢?用断点跟进去瞧瞧:

- 最终就是调用到匿名内部类中的实现。
---
<br/>
可以看到**一开始callHandlerAdded0是在回调ChannelInitializer的handlerAdded方法,而ChannelInitializer的handlerAdded方法,最终又回调到[【坐标1】](#tag1)中匿名内部类实现的initChannel方法,然后在这个initChannel方法里面的addLast,才是真正添加我们自定义的ServerHandler(看下图)以及其他逻辑**。

<br/>
至于这里addLast添加ServerHandler的过程,其实跟上面讲的逻辑大致一样,只是最后回调到的方法可能是我们在ServerHandler中覆写的逻辑。
---
<br/>
## 客户端接入添加handler
当有新连接接入时,自然会为客户端channel中的pipeline添加childhandler链,而我们在实验代码中准备了MyServerInitialzer.java,并在Server.java中设置为它为客户端channel的handler。
新连接接入时,其实添加handler的大体逻辑和服务端启动时添加是差不多的,只是同样逻辑下代码执行的“路线”可能比服务端添加要长,下面我们就来一起看看。
<br/>
首先,要回顾[【初始化channel】](https://wenjie.store/archives/netty-handle-new-connections-3)中的channelRead方法,新连接接入时,就是在这段代码添加handler的:
<code>io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead</code>
```java
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 初始化时添加handler
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
```
- 这里的childHandler和服务端handler同样可以是多个。
<br/>
持续跟进addLast方法,又来到熟悉的地方了,也就是[【坐标2】](#tag2)的代码,但这次的执行逻辑就如同[【坐标2】](#tag2)代码的注释一样,有点不一样了:
<code>io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)</code>
```java
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// 这次则是会进入这里的if代码块,然后返回。
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
...(略)
```
<br/>
Q:嗯?就在这里返回了?那岂不是不会调用到callHandlerAdded0了吗?那事件还怎么传播呢?MyServerInitialzer实现的initChannel方法岂不是没人回调了吗?
A:不用担心,会调用callHandlerAdded0的,只不过不是现在,继续跟下去就知道了。
<br/>
上面的代码既然返回了,我就继续追下去,之后返回到了[【坐标2】](#tag2)的channelRead方法再继续执行下去:
<code>io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead</code>
```java
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
```
- 这里的register就是关键。
<br/>
这里持续跟进register方法,最终来到了register0方法,跟进过程请参考[【初始化客户端channel】](https://wenjie.store/archives/netty-handle-new-connections-3),这里不再赘述:
<code>io.netty.channel.AbstractChannel.AbstractUnsafe#register0</code>
```java
// 省略绝大部分代码
private void register0(ChannelPromise promise) {
...(略)
pipeline.invokeHandlerAddedIfNeeded();
...(略)
}
```
- 看到这个方法名,我就知道"它"要lei了~
<br/>
继续跟进invokeHandlerAddedIfNeeded方法:
<code>io.netty.channel.DefaultChannelPipeline#invokeHandlerAddedIfNeeded</code>
```java
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
```
继续跟进callHandlerAddedForAllHandlers:
<code>io.netty.channel.DefaultChannelPipeline#callHandlerAddedForAllHandlers</code>
```java
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
...(略)
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
```
跟进这个execute方法:
<code>io.netty.channel.DefaultChannelPipeline.PendingHandlerAddedTask#execute</code>
```java
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
remove0(ctx);
ctx.setRemoved();
}
}
}
```
好了,终于找到前面讲过callHandlerAdded0方法,这个方法最终会回调handlerAdded方法。可由于我们的MyServerInitialzer继承了ChannelInitializer但没有覆写handlerAdded方法,所以最终还是调用了ChannelInitializer的handlerAdded实现,即最终调用到MyServerInitialzer的initChannel。
<br/>
好了,无论是服务端初始化时添加handler,还是新连接接入添加handler,都过了一遍,本节也该告一段落了。
---
<br/>
# **小结**
- 在服务端初始化channel、客户端初始化channel时,都会添加各自的handler。
- 新连接接入时,添加完客户端handler后,不会像添加完服务端马上进行handlerAdded事件回调,而是会延后至注册时。
- 无论是服务端初始化还是客户端初始化,插入handler的位置都是tail前,节点结构都是双向链表。

【Netty】Pipeline相关(二):添加ChannelHandler