前言
这一节的很多理论知识基础都严重依赖【上一节】讲到的ChannelIn/OutboundInvoker和ChannelIn/OutboundHandler方法意义区别。如果对这四个接口的区别没什么印象,建议回去重新看一遍。
本节主要来讲讲添加ChannelHandler的源码逻辑,顺便会提点之前服务端、客户端初始化遇到过的添加handler逻辑。
Netty Version:4.1.6
大致流程
这里先不分服务端、客户端,列一下源码的大致流程:
- 判断handler是否重复添加。
- 创建结点:给handler起名字、绑定NioEventLoop、保存handler。
- 将节点添加至双向链表中,也就是HeadContext和TailContext中间。
- 发起添加事件传播(回调)。
实验代码
Server.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;
/**
* @author cwj
*/
public final class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap server = new ServerBootstrap();
server.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.handler(new ServerHandler())
.childHandler(new MyServerInitialzer());
ChannelFuture future = server.bind(8888).sync();
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
ServerHandler
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("handlerAdded");
}
}
MyServerInitialzer.java
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;
public class MyServerInitialzer extends ChannelInitializer<SocketChannel>{
@Override
protected void initChannel(SocketChannel ch) throws Exception {
System.out.println("initChannel");
ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
ch.pipeline().addLast(new ChannelOutboundHandlerAdapter());
}
}
跟进源码
服务端添加ChannelHandler
先从添加服务端ChannelHandler开始跟进,找到在【初始化服务端channel】一节的init方法,服务端添加handler的核心代码如下,此处【坐标1】:
io.netty.bootstrap.ServerBootstrap#init
...(略)
p.addLast(new ChannelInitializer<Channel>() {
// addLast后回调的方法,下面的addLast才是真正添加我们自定义的ServerHandler
@Override
public void initChannel(Channel ch) throws Exception {
final ChannelPipeline pipeline = ch.pipeline();
ChannelHandler handler = config.handler();
if (handler != null) {
pipeline.addLast(handler);
}
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
// 添加自定义的ServerHandler
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
}
});
- addLast方法其实就是添加ChannelHandler的核心方法,由ChannelPipe接口抽象。
- 代码段的第一个handler并不是添加我们自定义的ServerHandler,而是先添加ChannelInitializer,之后ChannelInitializer添加完毕后会回调HandlerAdded方法 & 删除自己,而ChannelInitializer的HandlerAdded方法最终又会回调到上面代码块中的initChannel方法,在这个initChannel方法中的addLast才是真正添加自定义的ServerHandler。(现在不懂没关系,下面追源码就是这段逻辑)
在跟进源码之前,我们不妨再来看看当前addLast方法的抽象接口ChannelPipeline的关系图:
- 可见它间接实现了两Invoker接口,说明它的方法极有可能拥有发起事件传播(回调)的能力。
关于两Invoker和两Handler接口如果感到陌生,请看上一节 。
好了,现在视角重新转回上面【坐标1】中的代码,来追一追p.addLast,进入这个addLast方法:
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
- 注意NioEventLoop被设置为null。
继续跟进addLast方法
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, io.netty.channel.ChannelHandler...)
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
if (handlers == null) {
throw new NullPointerException("handlers");
}
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
- handler的name也设置为null,到后面创建节点才真正设置。
继续跟进addLast,能看到总体逻辑,此处【坐标2】:
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// 检查此handler是否共享的、添加过。
checkMultiplicity(handler);
// 创建新节点,这一步会分配NioEventLoop、检查name是否重复、命名、保存handler等。
newCtx = newContext(group, filterName(name, handler), handler);
// 添加节点至pipeline的双向链表。
addLast0(newCtx);
// 如果是添加客户端ChannelHandler都会走这一段,添加服务端ChannelHandler则不会进入这里。
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
// 下面代码就是发起handlerAdded事件传播(回调)
EventExecutor executor = newCtx.executor();
if (!executor.inEventLoop()) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
return this;
}
}
callHandlerAdded0(newCtx);
return this;
}
- 代码有注释的就是接下来要分析的方法。
校验handler
先进入checkMultiplicity方法看看:
io.netty.channel.DefaultChannelPipeline#checkMultiplicity
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
- 这个方法就是通过added标记判断handler是否被添加过,还有通过isSharable()方法判断handler是否有@Share注解,即判断能否被共享。
- 另外注意这里的added是boolean类型,但却没有用volatile关键字修饰,原因是后面还会有其它校验,不用担心这里会因为重排序而导致致命错误。
创建节点
视角再转回【坐标2】的代码,找到这一段代码,此处【坐标3】:
newCtx = newContext(group, filterName(name, handler), handler);
跟进filterName方法看看:
io.netty.channel.DefaultChannelPipeline#filterName
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);
return name;
}
- 如果没有名字就起一个,如果有则校验一下名字有没有重复。
起名字的方法我这里的不跟了,来看看校验名字的方法,即checkDuplicateName,最终跟到如下代码:
io.netty.channel.DefaultChannelPipeline#context0
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
- 这里其实就是遍历在上一节提到的双向链表,因为这里是双向链表第一次用上了,所以特地跟进来看看。
给handler取完名后,视角转回【坐标3】的代码,跟进newContext,看看创建节点的过程:
io.netty.channel.DefaultChannelPipeline#newContext
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
- 可以看见节点的实际类型是DefaultChannelHandlerContext。
- 这里的childExecutor方法并不是分配NioEventLoop的,因为只要传入的参数为null,它就返回null,所以最终传进DefaultChannelHandlerContext的还是null。
跟进DefaultChannelHandlerContext构造方法:
DefaultChannelHandlerContext(
DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
if (handler == null) {
throw new NullPointerException("handler");
}
this.handler = handler;
}
- 这里的super构造方法其实就是调用【上一节】提到的AbstractChannelHandlerContext构造方法,这里就不再赘述了,主要是节点中保存了handler。
将节点添加至双向链表
现在节点已经创建好了,接下来就是要把节点添加到初始化pipeline时创建的head和tail之间了。
视角转回【坐标2】的代码,进入到addLast0方法:
io.netty.channel.DefaultChannelPipeline#addLast0
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
- 将当前节点添加到tail节点的前面。
发起事件传播(回调)
节点添加已经完成了,接下来就是要发起一下事件传播(回调)了。
视角重新拉回【坐标2】的代码,跟进callHandlerAdded0方法:
io.netty.channel.DefaultChannelPipeline#callHandlerAdded0
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
ctx.handler().handlerAdded(ctx);
ctx.setAddComplete();
} catch (Throwable t) {
...(略)
}
- 其中ctx.handler().handlerAdded(ctx)就是真正的回调方法,因为【坐标1】中首先添加的是ChannelInitializer,所以这里handler返回的其实是ChannelInitializer的实例。
- 需要注意的是,打断点是看见handler()返回如下:
- 这是因为我们在【坐标1】的代码中,使用的是匿名内部类,实际上这个实例直属类就是ChannelInitializer。
另外,ChannelInitializer简介实现了ChannelInboundHandler接口,忘记这个接口的请看【上一节】,这个类的方法的主体功能相信不用我再赘述了。
跟进handlerAdded方法:
io.netty.channel.ChannelInitializer#handlerAdded
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
if (ctx.channel().isRegistered()) {
initChannel(ctx);
}
}
跟进initChannel方法:
io.netty.channel.ChannelInitializer#initChannel(io.netty.channel.ChannelHandlerContext)
private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
try {
initChannel((C) ctx.channel());
} catch (Throwable cause) {
// Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
// We do so to prevent multiple calls to initChannel(...).
exceptionCaught(ctx, cause);
} finally {
remove(ctx);
}
return true;
}
return false;
}
继续跟进initChannel方法,会发现这是个抽象实现:
protected abstract void initChannel(C ch) throws Exception;
那么这个抽象实现实际上是回调到谁呢?用断点跟进去瞧瞧:
- 最终就是调用到匿名内部类中的实现。
可以看到一开始callHandlerAdded0是在回调ChannelInitializer的handlerAdded方法,而ChannelInitializer的handlerAdded方法,最终又回调到【坐标1】中匿名内部类实现的initChannel方法,然后在这个initChannel方法里面的addLast,才是真正添加我们自定义的ServerHandler(看下图)以及其他逻辑。
至于这里addLast添加ServerHandler的过程,其实跟上面讲的逻辑大致一样,只是最后回调到的方法可能是我们在ServerHandler中覆写的逻辑。
客户端接入添加handler
当有新连接接入时,自然会为客户端channel中的pipeline添加childhandler链,而我们在实验代码中准备了MyServerInitialzer.java,并在Server.java中设置为它为客户端channel的handler。
新连接接入时,其实添加handler的大体逻辑和服务端启动时添加是差不多的,只是同样逻辑下代码执行的“路线”可能比服务端添加要长,下面我们就来一起看看。
首先,要回顾【初始化channel】中的channelRead方法,新连接接入时,就是在这段代码添加handler的:
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
// 初始化时添加handler
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
- 这里的childHandler和服务端handler同样可以是多个。
持续跟进addLast方法,又来到熟悉的地方了,也就是【坐标2】的代码,但这次的执行逻辑就如同【坐标2】代码的注释一样,有点不一样了:
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)
@Override
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
checkMultiplicity(handler);
newCtx = newContext(group, filterName(name, handler), handler);
addLast0(newCtx);
// 这次则是会进入这里的if代码块,然后返回。
if (!registered) {
newCtx.setAddPending();
callHandlerCallbackLater(newCtx, true);
return this;
}
...(略)
Q:嗯?就在这里返回了?那岂不是不会调用到callHandlerAdded0了吗?那事件还怎么传播呢?MyServerInitialzer实现的initChannel方法岂不是没人回调了吗?
A:不用担心,会调用callHandlerAdded0的,只不过不是现在,继续跟下去就知道了。
上面的代码既然返回了,我就继续追下去,之后返回到了【坐标2】的channelRead方法再继续执行下去:
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
- 这里的register就是关键。
这里持续跟进register方法,最终来到了register0方法,跟进过程请参考【初始化客户端channel】,这里不再赘述:
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
// 省略绝大部分代码
private void register0(ChannelPromise promise) {
...(略)
pipeline.invokeHandlerAddedIfNeeded();
...(略)
}
- 看到这个方法名,我就知道"它"要lei了~
继续跟进invokeHandlerAddedIfNeeded方法:
io.netty.channel.DefaultChannelPipeline#invokeHandlerAddedIfNeeded
final void invokeHandlerAddedIfNeeded() {
assert channel.eventLoop().inEventLoop();
if (firstRegistration) {
firstRegistration = false;
callHandlerAddedForAllHandlers();
}
}
继续跟进callHandlerAddedForAllHandlers:
io.netty.channel.DefaultChannelPipeline#callHandlerAddedForAllHandlers
private void callHandlerAddedForAllHandlers() {
final PendingHandlerCallback pendingHandlerCallbackHead;
synchronized (this) {
...(略)
PendingHandlerCallback task = pendingHandlerCallbackHead;
while (task != null) {
task.execute();
task = task.next;
}
}
跟进这个execute方法:
io.netty.channel.DefaultChannelPipeline.PendingHandlerAddedTask#execute
@Override
void execute() {
EventExecutor executor = ctx.executor();
if (executor.inEventLoop()) {
callHandlerAdded0(ctx);
} else {
try {
executor.execute(this);
} catch (RejectedExecutionException e) {
if (logger.isWarnEnabled()) {
logger.warn(
"Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
executor, ctx.name(), e);
}
remove0(ctx);
ctx.setRemoved();
}
}
}
好了,终于找到前面讲过callHandlerAdded0方法,这个方法最终会回调handlerAdded方法。可由于我们的MyServerInitialzer继承了ChannelInitializer但没有覆写handlerAdded方法,所以最终还是调用了ChannelInitializer的handlerAdded实现,即最终调用到MyServerInitialzer的initChannel。
好了,无论是服务端初始化时添加handler,还是新连接接入添加handler,都过了一遍,本节也该告一段落了。
小结
- 在服务端初始化channel、客户端初始化channel时,都会添加各自的handler。
- 新连接接入时,添加完客户端handler后,不会像添加完服务端马上进行handlerAdded事件回调,而是会延后至注册时。
- 无论是服务端初始化还是客户端初始化,插入handler的位置都是tail前,节点结构都是双向链表。