【Netty】Pipeline相关(五):InBound事件传播(channelRead)

【Netty】Pipeline相关(五):InBound事件传播(channelRead)

Scroll Down

前言

在看本节之前,最好看看inbound和outbound事件的区别这一篇,先对inbound和outbound事件的区别有一个理解再看这篇会更好。并且,知道添加后的ChannelHandler的顺序也是前提,可以参考【添加handler】一节。

本节就是挑一个inbound事件传播来追踪源码,最终选定了channelRead这个方法,如果对这个方法感到陌生也没关系,因为本节主要关注的是事件传播的流程、顺序。

Netty Version:4.1.6


实验代码

Server.java

import com.imooc.netty.ch3.ServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;

public final class Server {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(1);

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .handler(new ServerHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new InBoundHandlerA());
                            ch.pipeline().addLast(new InBoundHandlerB());
                            ch.pipeline().addLast(new InBoundHandlerC());
                        }
                    });

            ChannelFuture f = b.bind(8888).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

InBoundHandlerA

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class InBoundHandlerA extends ChannelInboundHandlerAdapter {

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerA: " + msg);
        ctx.fireChannelRead(msg);
    }

InBoundHandlerB

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

/**
 * @author
 */
public class InBoundHandlerB extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("InBoundHandlerB: " + msg);
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        // 本节入口
        ctx.channel().pipeline().fireChannelRead("hello world");
    }
}

InBoundHandlerCInBoundHandlerA


跟进源码

入口就是InBoundHandlerB的channelActive方法,此处【坐标1】

    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        ctx.channel().pipeline().fireChannelRead("hello world");
    }

最好在上述方法打个断点,启动Server.java后,再用telnet连接即可进入这段代码。至于是怎么进入这里的channelActive方法的,可以参考端口绑定这一篇,客户端处理逻辑也一样,这里就不再赘述。


跟进fireChannelRead方法,实现类是DefaultChannelPipeline,在pipeline初始化这节讲过:
io.netty.channel.DefaultChannelPipeline#fireChannelRead

    @Override
    public final ChannelPipeline fireChannelRead(Object msg) {
        AbstractChannelHandlerContext.invokeChannelRead(head, msg);
        return this;
    }
  • 需要注意这里传了head节点进去,这一步大概率说明事件是从head开始向tail传播的。

跟进invokeChannelRead方法,此处【坐标0.5】
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(AbstractChannelHandlerContext, Object)

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }

继续跟进invokeChannelRead方法:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
  • handler()当前返回的就是head节点:image.png
  • 所以我们不难推测接下来就是调用HeadContext的channelRead方法。

跟进channelRead方法,此处【坐标2】
io.netty.channel.DefaultChannelPipeline.HeadContext#channelRead

        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.fireChannelRead(msg);
        }
  • 请注意这里的fireChannelRead和【坐标1】处fireChannelRead的区别。
  • 【坐标1】的fireChannelRead实现是DefaultChannelPipeline,而这里的fireChannelRead实现是AbstractChannelHandlerContext。两种实现是不一样的。前者是传播到head,后者是传播到下一个节点,接下来就能看到了。

跟进fireChannelRead方法:
io.netty.channel.AbstractChannelHandlerContext#fireChannelRead

    @Override
    public ChannelHandlerContext fireChannelRead(final Object msg) {
        invokeChannelRead(findContextInbound(), msg);
        return this;
    }

跟进findContextInbound方法:
io.netty.channel.AbstractChannelHandlerContext#findContextInbound

    private AbstractChannelHandlerContext findContextInbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.next;
        } while (!ctx.inbound);
        return ctx;
    }
  • 可以发现就是取当前节点的下一个inbound节点,这意味着可能会将事件传播到下一个节点。

返回到fireChannelRead,跟进invokeChannelRead方法:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(bstractChannelHandlerContext, Object)

    static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
        final Object m = next.pipeline.touch(ObjectUtil.checkNotNull(msg, "msg"), next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelRead(m);
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelRead(m);
                }
            });
        }
    }
  • 发现这个其实就是【坐标0.5】中的代码,也就是绕了一圈又回来了,区别在于当前节点的handler变了:断点值.png

再继续跟一遍invokeChannelRead方法:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
  • 断点值.png
  • 很明显,接下来就是去到我们InBoundHandlerA覆写的channelRead方法了。

继续跟channelRead方法:
断点跟进.png

  • 果不其然。

至于到这一步的ctx.fireChannelRead(msg),其实跟【坐标2】的后序流程是一样的。

也就是接下来会将事件传播到InBoundHandlerB的channelRead,然后InBoundHandlerB的channelRead也会调用fireChannelRead发起事件传播,然后事件传播到InBoundHandlerC,InBoundHandlerC也会继续往下传播,最终传到TailContext进行收尾工作。

后序的传播追踪我就不一一列举了,相信搞懂了上面的循环逻辑,即便不追断点,脑海里也很清楚整个传播流程了。



小结

  • 本小节主要是通过跟踪channelRead的事件传播来间接的分析Inbound事件传播,验证了inbound事件传播的顺序,其实就如上一小节所说,是按照我们添加inboundhandler的顺序进行传播的。