【Netty】处理新连接(四):客户端channel事件传播&read事件注册

【Netty】处理新连接(四):客户端channel事件传播&read事件注册

Scroll Down

回顾

上一节记录了客户端channel(NioSocketChannel)初始化的大致流程。

经过前面服务端channel的学习,我们不难推测,客户端channel初始化之后也是需要进行事件传播的。而这一节就跟进它的事件传播,源码逻辑则大部分复用【端口绑定】这一节的逻辑。

Netty Version:4.1.6


跟进源码

在追踪源码之前,得要先定位起点,起点就是上一节接近文末处的【坐标2】的io.netty.channel.AbstractChannel.AbstractUnsafe#register0方法。至于启动Server.java、打断点等这里就不再赘述了。

下面再贴一次代码:

        private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                // 注册selector核心方法
                doRegister();
                neverRegistered = false;
                registered = true;

                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // 服务端启动注册selector后因为还没绑定端口,所以这里为false。
                //但现在是客户端连接接入,很明显服务端已经完成初始化了,所以此处isActive()结果为true。
                if (isActive()) {
                    if (firstRegistration) {
                        // 传播事件的核心方法
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

关于fireChannelActive方法,其实在【端口绑定】这一节也追踪过,这里就当复习一下吧。想看结尾就跳【坐标2】。



这里就从fireChannelActive方法开始追踪,现在进入fireChannelActive方法(也可自己打断点跟进):
io.netty.channel.DefaultChannelPipeline#fireChannelActive

    @Override
    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }

继续跟进invokeChannelActive方法:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelActive(io.netty.channel.AbstractChannelHandlerContext)

    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }

继续跟进invokeChannelActive方法:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelActive()

    private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }

继续跟进channelActive方法:
io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }
  • 这里的fireChannelActive最终会执行到上一节ServerHandler.java的channelActive方法。

继续追readIfIsAutoRead方法:
io.netty.channel.DefaultChannelPipeline.HeadContext#readIfIsAutoRead

        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                channel.read();
            }
        }

继续进入read方法:
io.netty.channel.AbstractChannel#read

    @Override
    public Channel read() {
        pipeline.read();
        return this;
    }

继续跟进read方法:
io.netty.channel.DefaultChannelPipeline#read

    @Override
    public final ChannelPipeline read() {
        tail.read();
        return this;
    }

继续跟进read方法:
io.netty.channel.AbstractChannelHandlerContext#read

    @Override
    public ChannelHandlerContext read() {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            Runnable task = next.invokeReadTask;
            if (task == null) {
                next.invokeReadTask = task = new Runnable() {
                    @Override
                    public void run() {
                        next.invokeRead();
                    }
                };
            }
            executor.execute(task);
        }

        return this;
    }

继续跟进invokeRead方法:
io.netty.channel.AbstractChannelHandlerContext#invokeRead

    private void invokeRead() {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).read(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            read();
        }
    }

继续跟进read方法:
io.netty.channel.DefaultChannelPipeline.HeadContext#read

        @Override
        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

继续跟进这个beginRead方法:
io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead

        @Override
        public final void beginRead() {
            assertEventLoop();

            if (!isActive()) {
                return;
            }

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }


继续进入doBeginRead方法,终于来到熟悉的老地方了,此处【坐标2】
io.netty.channel.nio.AbstractNioChannel#doBeginRead

    @Override
    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

到这一步,和服务端channel事件传播的区别就是:当前客户端channel传播的是OP_READ事件,不信可以来看看下面的断点图:
image.png

readInterestOp=1,即=OP_READ,这个值是在【创建客户端channel】这篇博客中设置的。

最终,当改变了ops的值后,就完成了read事件注册,NioEventLoop中run方法的processSelectedKeys方法就会开始关注OP_READ事件,详细可参考【processSelectedKeys执行逻辑】这一篇。

因为代码逻辑和前面【端口绑定】,这里就不需要小结了。