前言

在前面的博客中,主要学习的是Netty处理服务端相关逻辑的原码追踪,还未讲到对客户端的处理是怎么样的。

那么这节,就来看看Netty大致上是如何处理客户端新连接的。

Netty Version:4.1.6

另外,本篇内容以【select方法-检查I/O事件】【processSelectedKeys执行逻辑】为前置知识的,如果遇到这里还会简单描述下,详细忘记的还请回去看。

在写此篇博客之前,写了一篇小总结,里面记录了如NioByteUnsafe和NioMessageUnsafe的区别等。

大致流程

检测新连接的大致流程如下:

  • processSelectedKey(selectionkey, channel)
  • NioMessageUnsafe.read()
  • doReadMessage()
  • javaChannel().accept()

从流程中不难看出,其实就是去监听accept事件。

实验代码

Server.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;

/**
 * @author cwj
 */
public final class Server {

    public static void main(String[] args) throws Exception {
        // 两大线程
        // 对应Socket中Server的启动监听线程(负责接收新连接并抛给workerGroup)
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 对应Socket中Client中主函数的线程(负责处理新连接)
        EventLoopGroup workerGroup = new NioEventLoopGroup(1);

        try {
            // 辅助类
            ServerBootstrap server = new ServerBootstrap();
            // 装配线程
            server.group(bossGroup, workerGroup)
                    // 设置Channel的类型
                    .channel(NioServerSocketChannel.class)
                    // 给每个客户端的连接设置一些tcp的基本属性
//                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    // 服务端启动时做的逻辑
                    .handler(new ServerHandler())
                    // 给pipeline配置handler,Channel发生某种变化时对应的处理逻辑
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            //ch.pipeline().addLast(new AuthHandler());
                        }
                    });

            // 辅助类,绑定端口,这里使用了同步,也就是说它原本是异步的:io.netty.bootstrap.AbstractBootstrap.doBind
            ChannelFuture future = server.bind(8888).sync();
            // 关闭
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

跟进源码

在跟进源码之前,先找到起点:
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel),如果忘记怎么进入这个方法的,可以回去看processSelectedKeys执行逻辑那一篇。

在processSelectedKey这个方法上打个断点:

在processSelectedKey方法上打个断点.png

在Server.java的new ServerBootstrap方法打个断点:

ServerBootStrap的断点.png

然后运行Server.java的main方法,首先进入第一个断点,即ServerBootstrap处的断点,记下bossGroup和workerGroup中NioEventLoop的@编号:

bossGroup和workerGroup中NioEventLoop的地址.png

  • bossGroup的是1129
  • workerGroup的是1130

然后放开上一个断点,用telnet进行连接,之后就会进入下一个断点:

telnet 127.0.0.1 8888

进入断点效果如下:

进入断点.png


理一理当前参数

对参数没兴趣就跳到下面的【继续跟进】。

在让断点继续玩下走之前,先理一理当前的一些参数、变量是什么,首先看看最外层:

  • this是NioEventLoop@1129,即bossGroup中的NioEventLoop,验证了之前"parentGroup主要是处理新连接"的说法。
  • 方法传入的AbstractNioChannel其实就是服务端channel,即NioServerSocketChannel。

再来看看传进来的SelectionKey和NioServerSocketChannel里面是什么东西,这里先看NioServerSocketChannel:

NioServerSocketChannel中参数一览.png

  • NioServerSocketChannel传进来的主要作用,其实主要就是为了获取它的eventLoop和unsafe。
  • 请记住这里的NioEventLoop@1129。
  • 还记得上面的eventLoop是来自哪里的吗?不记得就看注册selector那篇。

下面再来看看传进来的selectionKey里面有什么:

SelectionKey断点截图1.png

  • 关于selectionKey最原始的产生地,请看注册selector那一节。
  • SelectionKey在processSelectedKey方法中主要发挥的作用就是:它保存了事件集。

有趣的事情来了,当我眼睛睁大点后,发现了如下片段:

  • SelectionKey中的attachment实例居然和传进来的NioServerSocketChannel是同一个实例,虽然我在前面的博客中提到过attachment可以看做channel,但也没想到这里是同一个。

之后再打开attachment看看里面有什么:

image.png

  • 找到核心的eventLoop,发现它根上面NioServerSocketChannel中的eventLoop是同一个。
  • 这下更加确认attachment就等于processSelectedKey方法传进来的NioServerSocketChannel。

那按照上面的推论,processSelectedKey方法岂不是只要一个SelectionKey作为参数不就可以了?需要NioServerSocketChannel的时候调用attachment()方法获取不就可以了,为什么还需要再额外传NioServerSocketChannel?

  • 第一问回答:我做了简单测试,确实可以。
    • 由于在processSelectedKey方法中,NioServerSocketChannel只调用了eventLoop()和unsafe(),所以我就只测试了这两方法。(结果见下图)
    • image.png
    • 从上图可以看出,最终返回的都是同一实例,所以理论上processSelectedKey方法应该只传SelectionKey就可以了。
  • 第二问回到:我也不知道hhh。

到此,参数就暂时理这么多了,下面继续跟进源码。



继续跟进

在上面SelectionKey参数的截图中,我们看到当前需要处理的事件readyOps=16,即OP_ACCEPT。

所以,我们上面的断点,就达到if判断OP_ACCEPT的代码块中,即如下代码:
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)

   if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
       unsafe.read();
       if (!ch.isOpen()) {
           // Connection already closed - no need to handle write.
           return;
       }
   }


断点跟进read方法,看到如下关键代码,此处【坐标1】: io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read

            assert eventLoop().inEventLoop();
            final ChannelConfig config = config();
            final ChannelPipeline pipeline = pipeline();
            // 控制当前获取连接数的东西
            final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
            allocHandle.reset(config);

            ...(略)
                    // 出了这个do...while循环,就算获取完这一次轮询的新连接了
                    do {
                        // 新连接accept
                        int localRead = doReadMessages(readBuf);
                        if (localRead == 0) {
                            break;
                        }
                        if (localRead < 0) {
                            closed = true;
                            break;
                        }
                        // 计数器+1
                        allocHandle.incMessagesRead(localRead);
                        // 下面的continueReading就是判断当前读的新连接数量是否超过上限了(默认一次轮询16个)
                    } while (allocHandle.continueReading());
            ...(略)
                for (int i = 0; i < size; i ++) {
                    readPending = false;
                    // 这个方法将在下下篇博客讲【新连接分配NioEventLoop、注册selector】时核心讲解
                    pipeline.fireChannelRead(readBuf.get(i));
                }
            ...(略)
  • 请看代码上的注释。

断点继续跟进doReadMessages方法:
io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages

    @Override
    protected int doReadMessages(List<Object> buf) throws Exception {
        SocketChannel ch = javaChannel().accept();

        try {
            if (ch != null) {
                buf.add(new NioSocketChannel(this, ch));
                return 1;
            }
        } catch (Throwable t) {
            logger.warn("Failed to create a new channel from an accepted socket.", t);

            try {
                ch.close();
            } catch (Throwable t2) {
                logger.warn("Failed to close a socket.", t2);
            }
        }

        return 0;
    }
  • 可以看到代码第一行,就是通过jdk底层的ServerSocketChannelImpl调用accept方法获取新连接的。关于javaChannel()获取到什么,可以看创建服务端channel这篇。
  • 获取到的SocketChannel和this(NioServerSocketChannel)会被封装成NioSocketChannel,关于NioSocketChannel下一节就分析。
  • 最终,新连接添加到buf成功后,就返回。

返回之后的事情,在【坐标1】的代码注释中也有解释了,这里就不再赘述。到此,检测+获取新连接的操作也就完成了。



小结

  • 服务端检测新连接其实就是靠NioEventLoop中的run方法轮询实现的,只不过在端口绑定完成之前,就算轮询到accept事件,processSelectionKeys方法也不会去处理而已。
  • 获取到新连接的大致流程就是:
    • NioEventLoop的run方法一直轮询。
    • select方法检测出I/O事件。
    • processSelectionKeys决定要处理哪些事件,如OP_ACCEPT等,要处理的事件可由事件传播改变。
    • 最终调用jdk底层的channel的accept方法得到新连接(之后还会对新连接做各种绑定、注册等处理)。

如果对NioEventLoop的select、processSelectionKeys等方法感到陌生,建议回去看【NioEventLoop的启动】的相关博客。