回顾


Netty version:4.1.6

前面三节中Netty已经完成创建、初始化Channel以及注册selector,那么它只差最后一步就能开始和客户端建立连接了,即本节的:端口绑定。


端口绑定

大致函数调用流程:
端口绑定大致流程.png

确认起点

本节起点位置:io.netty.bootstrap.AbstractBootstrap#doBind

如果不知道怎么进来的,可以去创建服务端Channel那一节重新看看,这里不再赘述。

开始源码追踪

现在,视角重新回到io.netty.bootstrap.AbstractBootstrap#doBind,找到doBind0方法:

AbstractBootstrapdoBind.png

进入doBind0方法:
io.netty.bootstrap.AbstractBootstrap#doBind0

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

然后再进入bind方法(实现选择AbstractChannel):
io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)

    public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
        return pipeline.bind(localAddress, promise);
    }

之后一路bind(源码略):

  • 进入io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
  • 进入io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
  • 进入io.netty.channel.AbstractChannelHandlerContext#invokeBind
  • 进入io.netty.channel.DefaultChannelPipeline.HeadContext#bind
  • 进入io.netty.channel.AbstractChannel.AbstractUnsafe#bind

如果对源码怎么跳的不感兴趣,那就直接将视角切换到:io.netty.channel.AbstractChannel.AbstractUnsafe#bind,接下来都是围绕这里:

        public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
            assertEventLoop();

            if (!promise.setUncancellable() || !ensureOpen(promise)) {
                return;
            }

            // See: https://github.com/netty/netty/issues/576
            if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
                localAddress instanceof InetSocketAddress &&
                !((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
                !PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
                logger.warn(
                        "A non-root user can't receive a broadcast packet if the socket " +
                        "is not bound to a wildcard address; binding to a non-wildcard " +
                        "address (" + localAddress + ") anyway as requested.");
            }

            boolean wasActive = isActive();
            try {
                doBind(localAddress);
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                closeIfClosed();
                return;
            }

            if (!wasActive && isActive()) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireChannelActive();
                    }
                });
            }

            safeSetSuccess(promise);
        }

我们先进入上面doBind方法(实现类:NioServerSocketChannel): io.netty.channel.socket.nio.NioSocketChannel#doBind
    protected void doBind(SocketAddress localAddress) throws Exception {
        if (PlatformDependent.javaVersion() >= 7) {
            javaChannel().bind(localAddress, config.getBacklog());
        } else {
            javaChannel().socket().bind(localAddress, config.getBacklog());
        }
    }

上面的代码底层其实就是使用jdk的api绑定端口,我们可以再来看看它的参数大概是个什么东西:
断点看参数.png

可以看到,就是我们自定义的一些参数,包括ip、端口等,当这里的代码执行完后,就相当于绑定完成了,接下来就是要传播下事件,告诉Netty可以监听accept事件了。



然后,视角重新回到io.netty.channel.AbstractChannel.AbstractUnsafe#bind,找到下面这个方法:
找到fireChannelActive方法.png

图中的fireChannelActive方法其实就相当于一个通知,上一节也有讲到它对应的就是channelActive事件,我们在这里继续追一下源码:
io.netty.channel.DefaultChannelPipeline#fireChannelActive

    public final ChannelPipeline fireChannelActive() {
        AbstractChannelHandlerContext.invokeChannelActive(head);
        return this;
    }

之后再进入invokeChannelActive方法:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelActive(io.netty.channel.AbstractChannelHandlerContext)

    static void invokeChannelActive(final AbstractChannelHandlerContext next) {
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeChannelActive();
        } else {
            executor.execute(new Runnable() {
                @Override
                public void run() {
                    next.invokeChannelActive();
                }
            });
        }
    }

之后再进入invokeChannelActive函数:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelActive()

    private void invokeChannelActive() {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelActive(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            fireChannelActive();
        }
    }

之后再进入channelActive函数(实现类:DefaultChannelHandlerContext):
io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive

        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.fireChannelActive();

            readIfIsAutoRead();
        }

上面代码中的ctx.fireChannelActive();,最终会去到上一节ServerHandler.java中的channelActive方法。

我们再来关注下上面代码中的readIfIsAutoRead方法,追一下它:
io.netty.channel.DefaultChannelPipeline.HeadContext#readIfIsAutoRead

        private void readIfIsAutoRead() {
            if (channel.config().isAutoRead()) {
                channel.read();
            }
        }

进入上面代码中的read方法,之后也是一路入read方法(中间过程略),最终来到下面这个read方法:
io.netty.channel.AbstractChannelHandlerContext#read

    public ChannelHandlerContext read() {
        final AbstractChannelHandlerContext next = findContextOutbound();
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeRead();
        } else {
            Runnable task = next.invokeReadTask;
            if (task == null) {
                next.invokeReadTask = task = new Runnable() {
                    @Override
                    public void run() {
                        next.invokeRead();
                    }
                };
            }
            executor.execute(task);
        }

        return this;
    }

然后进入invokeRead方法:
io.netty.channel.AbstractChannelHandlerContext#invokeRead

    private void invokeRead() {
        if (invokeHandler()) {
            try {
                ((ChannelOutboundHandler) handler()).read(this);
            } catch (Throwable t) {
                notifyHandlerException(t);
            }
        } else {
            read();
        }
    }

然后再进入((ChannelOutboundHandler) handler()).read(this);的read方法:
io.netty.channel.DefaultChannelPipeline.HeadContext#read

        public void read(ChannelHandlerContext ctx) {
            unsafe.beginRead();
        }

之后再进入beginRead方法:
io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead

        public final void beginRead() {
            assertEventLoop();

            if (!isActive()) {
                return;
            }

            try {
                doBeginRead();
            } catch (final Exception e) {
                invokeLater(new Runnable() {
                    @Override
                    public void run() {
                        pipeline.fireExceptionCaught(e);
                    }
                });
                close(voidPromise());
            }
        }

之后再进入doBeginRead方法(实现类AbstractNioChannel):
io.netty.channel.nio.AbstractNioChannel#doBeginRead

    protected void doBeginRead() throws Exception {
        // Channel.read() or ChannelHandlerContext.read() was called
        final SelectionKey selectionKey = this.selectionKey;
        if (!selectionKey.isValid()) {
            return;
        }

        readPending = true;

        final int interestOps = selectionKey.interestOps();
        if ((interestOps & readInterestOp) == 0) {
            selectionKey.interestOps(interestOps | readInterestOp);
        }
    }

先解释下上面代码的核心含义:

  • 如果先前的设置的ops不等于现在的readInterestOp,那么现在就会将原本的ops修改为readInterestOp的值,readInterestOp表示Netty要开始关注accept事件了。

下面再来讲解下这里面的一些参数:

上面代码其实就是事件传播的核心,而这里的selectionKey看着很眼熟,其它就是上一节【注册selector】中的某个步骤中设置的,忘记的话可以回去看看。

除此之外,readInterestOp这个参数也很眼熟,还记得同样是在上一节【注册selector】中,我们也设置了一个ops=0,表示不需要关心任何事件,而这里的interestOps是从selectionKey中取的,我们不难推测+验证,这个interestOps就是我们之前设置的ops,即0。

上面的结论想要验证的话,可以自行打下断点,因为比较简单,我就不浪费流量贴图了。

不仅如此,其实readInterestOp也是在前面就设置好的,这里就需要追踪下了readInterestOp是在哪里初始化的了:
io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel

    protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
        super(parent);
        this.ch = ch;
        this.readInterestOp = readInterestOp;
        try {
            ch.configureBlocking(false);
        } catch (IOException e) {
            try {
                ch.close();
            } catch (IOException e2) {
                if (logger.isWarnEnabled()) {
                    logger.warn(
                            "Failed to close a partially initialized socket.", e2);
                }
            }

            throw new ChannelException("Failed to enter non-blocking mode.", e);
        }
    }

可以看见就是在当前类的构造函数被初始化的,那么这个构造函数是谁调用的呢?其实就在第一节【创建服务端Channel】中创建的,这里贴下代码:
io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel(java.nio.channels.ServerSocketChannel)

    public NioServerSocketChannel(ServerSocketChannel channel) {
        super(null, channel, SelectionKey.OP_ACCEPT);
        config = new NioServerSocketChannelConfig(this, javaChannel().socket());
    }

进入super方法,最终就会去到io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel,初始化readInterestOp。

而且我们能看到readInterestOp参数是SelectionKey.OP_ACCEPT,当上面讲到的io.netty.channel.nio.AbstractNioChannel#doBeginRead执行了selectionKey.interestOps(interestOps | readInterestOp);后,其实就等于告诉Netty可以关注accept事件了,和socket编程非常相似。


小结

这节除了讲到端口绑定外,还讲到了端口绑定后的事件传播,主要围绕两个方法,下面截了张图:

io.netty.channel.AbstractChannel.AbstractUnsafe#bind

两个核心函数.png