【Netty】Pipeline相关(二):添加ChannelHandler

【Netty】Pipeline相关(二):添加ChannelHandler

Scroll Down

前言

这一节的很多理论知识基础都严重依赖【上一节】讲到的ChannelIn/OutboundInvoker和ChannelIn/OutboundHandler方法意义区别。如果对这四个接口的区别没什么印象,建议回去重新看一遍。

本节主要来讲讲添加ChannelHandler的源码逻辑,顺便会提点之前服务端、客户端初始化遇到过的添加handler逻辑。

Netty Version:4.1.6


大致流程

这里先不分服务端、客户端,列一下源码的大致流程:

  • 判断handler是否重复添加。
  • 创建结点:给handler起名字、绑定NioEventLoop、保存handler。
  • 将节点添加至双向链表中,也就是HeadContext和TailContext中间。
  • 发起添加事件传播(回调)。

实验代码

Server.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;


/**
 * @author cwj
 */
public final class Server {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap server = new ServerBootstrap();

            server.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .handler(new ServerHandler())
                    .childHandler(new MyServerInitialzer());

            ChannelFuture future = server.bind(8888).sync();
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

ServerHandler

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("channelActive");
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        System.out.println("channelRegistered");
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        System.out.println("handlerAdded");
    }
}

MyServerInitialzer.java

import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.socket.SocketChannel;


public class MyServerInitialzer extends ChannelInitializer<SocketChannel>{

	@Override
	protected void initChannel(SocketChannel ch) throws Exception {
        System.out.println("initChannel");
		ch.pipeline().addLast(new ChannelInboundHandlerAdapter());
		ch.pipeline().addLast(new ChannelOutboundHandlerAdapter());
	}
}


跟进源码

服务端添加ChannelHandler

先从添加服务端ChannelHandler开始跟进,找到在【初始化服务端channel】一节的init方法,服务端添加handler的核心代码如下,此处【坐标1】
io.netty.bootstrap.ServerBootstrap#init

        ...(略)
        p.addLast(new ChannelInitializer<Channel>() {
            // addLast后回调的方法,下面的addLast才是真正添加我们自定义的ServerHandler
            @Override
            public void initChannel(Channel ch) throws Exception {
                final ChannelPipeline pipeline = ch.pipeline();
                ChannelHandler handler = config.handler();
                if (handler != null) {
                    pipeline.addLast(handler);
                }
                ch.eventLoop().execute(new Runnable() {
                    @Override
                    public void run() {
                        // 添加自定义的ServerHandler
                        pipeline.addLast(new ServerBootstrapAcceptor(
                                currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
                    }
                });
            }
        });
  • addLast方法其实就是添加ChannelHandler的核心方法,由ChannelPipe接口抽象。
  • 代码段的第一个handler并不是添加我们自定义的ServerHandler,而是先添加ChannelInitializer,之后ChannelInitializer添加完毕后会回调HandlerAdded方法 & 删除自己,而ChannelInitializer的HandlerAdded方法最终又会回调到上面代码块中的initChannel方法,在这个initChannel方法中的addLast才是真正添加自定义的ServerHandler。(现在不懂没关系,下面追源码就是这段逻辑)

在跟进源码之前,我们不妨再来看看当前addLast方法的抽象接口ChannelPipeline的关系图:
image.png

  • 可见它间接实现了两Invoker接口,说明它的方法极有可能拥有发起事件传播(回调)的能力。

关于两Invoker和两Handler接口如果感到陌生,请看上一节


好了,现在视角重新转回上面【坐标1】中的代码,来追一追p.addLast,进入这个addLast方法:
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.channel.ChannelHandler...)

    @Override
    public final ChannelPipeline addLast(ChannelHandler... handlers) {
        return addLast(null, handlers);
    }
  • 注意NioEventLoop被设置为null。

继续跟进addLast方法
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, io.netty.channel.ChannelHandler...)

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        if (handlers == null) {
            throw new NullPointerException("handlers");
        }

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }
  • handler的name也设置为null,到后面创建节点才真正设置。

继续跟进addLast,能看到总体逻辑,此处【坐标2】
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            // 检查此handler是否共享的、添加过。
            checkMultiplicity(handler);
            // 创建新节点,这一步会分配NioEventLoop、检查name是否重复、命名、保存handler等。
            newCtx = newContext(group, filterName(name, handler), handler);
            // 添加节点至pipeline的双向链表。
            addLast0(newCtx);

            // 如果是添加客户端ChannelHandler都会走这一段,添加服务端ChannelHandler则不会进入这里。
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            
            // 下面代码就是发起handlerAdded事件传播(回调)
            EventExecutor executor = newCtx.executor();
            if (!executor.inEventLoop()) {
                newCtx.setAddPending();
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        callHandlerAdded0(newCtx);
                    }
                });
                return this;
            }
        }
        callHandlerAdded0(newCtx);
        return this;
    }
  • 代码有注释的就是接下来要分析的方法。

校验handler

先进入checkMultiplicity方法看看:
io.netty.channel.DefaultChannelPipeline#checkMultiplicity

    private static void checkMultiplicity(ChannelHandler handler) {
        if (handler instanceof ChannelHandlerAdapter) {
            ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
            if (!h.isSharable() && h.added) {
                throw new ChannelPipelineException(
                        h.getClass().getName() +
                        " is not a @Sharable handler, so can't be added or removed multiple times.");
            }
            h.added = true;
        }
    }
  • 这个方法就是通过added标记判断handler是否被添加过,还有通过isSharable()方法判断handler是否有@Share注解,即判断能否被共享。
  • 另外注意这里的added是boolean类型,但却没有用volatile关键字修饰,原因是后面还会有其它校验,不用担心这里会因为重排序而导致致命错误。

创建节点

视角再转回【坐标2】的代码,找到这一段代码,此处【坐标3】

newCtx = newContext(group, filterName(name, handler), handler);

跟进filterName方法看看:
io.netty.channel.DefaultChannelPipeline#filterName

    private String filterName(String name, ChannelHandler handler) {
        if (name == null) {
            return generateName(handler);
        }
        checkDuplicateName(name);
        return name;
    }
  • 如果没有名字就起一个,如果有则校验一下名字有没有重复。

起名字的方法我这里的不跟了,来看看校验名字的方法,即checkDuplicateName,最终跟到如下代码:
io.netty.channel.DefaultChannelPipeline#context0

    private AbstractChannelHandlerContext context0(String name) {
        AbstractChannelHandlerContext context = head.next;
        while (context != tail) {
            if (context.name().equals(name)) {
                return context;
            }
            context = context.next;
        }
        return null;
    }
  • 这里其实就是遍历在上一节提到的双向链表,因为这里是双向链表第一次用上了,所以特地跟进来看看。

给handler取完名后,视角转回【坐标3】的代码,跟进newContext,看看创建节点的过程:
io.netty.channel.DefaultChannelPipeline#newContext

    private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
        return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
    }
  • 可以看见节点的实际类型是DefaultChannelHandlerContext。
  • 这里的childExecutor方法并不是分配NioEventLoop的,因为只要传入的参数为null,它就返回null,所以最终传进DefaultChannelHandlerContext的还是null。

跟进DefaultChannelHandlerContext构造方法:

    DefaultChannelHandlerContext(
            DefaultChannelPipeline pipeline, EventExecutor executor, String name, ChannelHandler handler) {
        super(pipeline, executor, name, isInbound(handler), isOutbound(handler));
        if (handler == null) {
            throw new NullPointerException("handler");
        }
        this.handler = handler;
    }
  • 这里的super构造方法其实就是调用【上一节】提到的AbstractChannelHandlerContext构造方法,这里就不再赘述了,主要是节点中保存了handler。

将节点添加至双向链表

现在节点已经创建好了,接下来就是要把节点添加到初始化pipeline时创建的head和tail之间了。

视角转回【坐标2】的代码,进入到addLast0方法:
io.netty.channel.DefaultChannelPipeline#addLast0

    private void addLast0(AbstractChannelHandlerContext newCtx) {
        AbstractChannelHandlerContext prev = tail.prev;
        newCtx.prev = prev;
        newCtx.next = tail;
        prev.next = newCtx;
        tail.prev = newCtx;
    }
  • 将当前节点添加到tail节点的前面。

发起事件传播(回调)

节点添加已经完成了,接下来就是要发起一下事件传播(回调)了。

视角重新拉回【坐标2】的代码,跟进callHandlerAdded0方法:
io.netty.channel.DefaultChannelPipeline#callHandlerAdded0

    private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
            ctx.handler().handlerAdded(ctx);
            ctx.setAddComplete();
        } catch (Throwable t) {
            ...(略)
        }
  • 其中ctx.handler().handlerAdded(ctx)就是真正的回调方法,因为【坐标1】中首先添加的是ChannelInitializer,所以这里handler返回的其实是ChannelInitializer的实例。
  • 需要注意的是,打断点是看见handler()返回如下:
    • image.png
    • 这是因为我们在【坐标1】的代码中,使用的是匿名内部类,实际上这个实例直属类就是ChannelInitializer。

另外,ChannelInitializer简介实现了ChannelInboundHandler接口,忘记这个接口的请看【上一节】,这个类的方法的主体功能相信不用我再赘述了。

跟进handlerAdded方法:
io.netty.channel.ChannelInitializer#handlerAdded

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
        if (ctx.channel().isRegistered()) {
            initChannel(ctx);
        }
    }

跟进initChannel方法:
io.netty.channel.ChannelInitializer#initChannel(io.netty.channel.ChannelHandlerContext)

    private boolean initChannel(ChannelHandlerContext ctx) throws Exception {
        if (initMap.putIfAbsent(ctx, Boolean.TRUE) == null) { // Guard against re-entrance.
            try {
                initChannel((C) ctx.channel());
            } catch (Throwable cause) {
                // Explicitly call exceptionCaught(...) as we removed the handler before calling initChannel(...).
                // We do so to prevent multiple calls to initChannel(...).
                exceptionCaught(ctx, cause);
            } finally {
                remove(ctx);
            }
            return true;
        }
        return false;
    }

继续跟进initChannel方法,会发现这是个抽象实现:

    protected abstract void initChannel(C ch) throws Exception;

那么这个抽象实现实际上是回调到谁呢?用断点跟进去瞧瞧:

image.png

  • 最终就是调用到匿名内部类中的实现。


可以看到一开始callHandlerAdded0是在回调ChannelInitializer的handlerAdded方法,而ChannelInitializer的handlerAdded方法,最终又回调到【坐标1】中匿名内部类实现的initChannel方法,然后在这个initChannel方法里面的addLast,才是真正添加我们自定义的ServerHandler(看下图)以及其他逻辑

添加ServerHandler


至于这里addLast添加ServerHandler的过程,其实跟上面讲的逻辑大致一样,只是最后回调到的方法可能是我们在ServerHandler中覆写的逻辑。



客户端接入添加handler

当有新连接接入时,自然会为客户端channel中的pipeline添加childhandler链,而我们在实验代码中准备了MyServerInitialzer.java,并在Server.java中设置为它为客户端channel的handler。

新连接接入时,其实添加handler的大体逻辑和服务端启动时添加是差不多的,只是同样逻辑下代码执行的“路线”可能比服务端添加要长,下面我们就来一起看看。


首先,要回顾【初始化channel】中的channelRead方法,新连接接入时,就是在这段代码添加handler的:
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;
            // 初始化时添加handler
            child.pipeline().addLast(childHandler);

            for (Entry<ChannelOption<?>, Object> e: childOptions) {
                try {
                    if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + child, t);
                }
            }

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
  • 这里的childHandler和服务端handler同样可以是多个。

持续跟进addLast方法,又来到熟悉的地方了,也就是【坐标2】的代码,但这次的执行逻辑就如同【坐标2】代码的注释一样,有点不一样了:
io.netty.channel.DefaultChannelPipeline#addLast(io.netty.util.concurrent.EventExecutorGroup, java.lang.String, io.netty.channel.ChannelHandler)

    @Override
    public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
        final AbstractChannelHandlerContext newCtx;
        synchronized (this) {
            checkMultiplicity(handler);

            newCtx = newContext(group, filterName(name, handler), handler);

            addLast0(newCtx);
            
            // 这次则是会进入这里的if代码块,然后返回。
            if (!registered) {
                newCtx.setAddPending();
                callHandlerCallbackLater(newCtx, true);
                return this;
            }
            ...(略)

Q:嗯?就在这里返回了?那岂不是不会调用到callHandlerAdded0了吗?那事件还怎么传播呢?MyServerInitialzer实现的initChannel方法岂不是没人回调了吗?

A:不用担心,会调用callHandlerAdded0的,只不过不是现在,继续跟下去就知道了。


上面的代码既然返回了,我就继续追下去,之后返回到了【坐标2】的channelRead方法再继续执行下去:
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

    childGroup.register(child).addListener(new ChannelFutureListener() {
        @Override
        public void operationComplete(ChannelFuture future) throws Exception {
            if (!future.isSuccess()) {
                forceClose(child, future.cause());
            }
        }
    });
  • 这里的register就是关键。

这里持续跟进register方法,最终来到了register0方法,跟进过程请参考【初始化客户端channel】,这里不再赘述:
io.netty.channel.AbstractChannel.AbstractUnsafe#register0

// 省略绝大部分代码
private void register0(ChannelPromise promise) {
    ...(略)
    pipeline.invokeHandlerAddedIfNeeded();
    ...(略)
}
  • 看到这个方法名,我就知道"它"要lei了~

继续跟进invokeHandlerAddedIfNeeded方法:
io.netty.channel.DefaultChannelPipeline#invokeHandlerAddedIfNeeded

    final void invokeHandlerAddedIfNeeded() {
        assert channel.eventLoop().inEventLoop();
        if (firstRegistration) {
            firstRegistration = false;
            callHandlerAddedForAllHandlers();
        }
    }

继续跟进callHandlerAddedForAllHandlers:
io.netty.channel.DefaultChannelPipeline#callHandlerAddedForAllHandlers

    private void callHandlerAddedForAllHandlers() {
        final PendingHandlerCallback pendingHandlerCallbackHead;
        synchronized (this) {
        ...(略)
        PendingHandlerCallback task = pendingHandlerCallbackHead;
        while (task != null) {
            task.execute();
            task = task.next;
        }
    }

跟进这个execute方法:
io.netty.channel.DefaultChannelPipeline.PendingHandlerAddedTask#execute

        @Override
        void execute() {
            EventExecutor executor = ctx.executor();
            if (executor.inEventLoop()) {
                callHandlerAdded0(ctx);
            } else {
                try {
                    executor.execute(this);
                } catch (RejectedExecutionException e) {
                    if (logger.isWarnEnabled()) {
                        logger.warn(
                                "Can't invoke handlerAdded() as the EventExecutor {} rejected it, removing handler {}.",
                                executor, ctx.name(), e);
                    }
                    remove0(ctx);
                    ctx.setRemoved();
                }
            }
        }

好了,终于找到前面讲过callHandlerAdded0方法,这个方法最终会回调handlerAdded方法。可由于我们的MyServerInitialzer继承了ChannelInitializer但没有覆写handlerAdded方法,所以最终还是调用了ChannelInitializer的handlerAdded实现,即最终调用到MyServerInitialzer的initChannel。


好了,无论是服务端初始化时添加handler,还是新连接接入添加handler,都过了一遍,本节也该告一段落了。



小结

  • 在服务端初始化channel、客户端初始化channel时,都会添加各自的handler。
  • 新连接接入时,添加完客户端handler后,不会像添加完服务端马上进行handlerAdded事件回调,而是会延后至注册时。
  • 无论是服务端初始化还是客户端初始化,插入handler的位置都是tail前,节点结构都是双向链表。