前言
在前面的博客中,主要学习的是Netty处理服务端相关逻辑的原码追踪,还未讲到对客户端的处理是怎么样的。
那么这节,就来看看Netty大致上是如何处理客户端新连接的。
Netty Version:4.1.6
另外,本篇内容以【select方法-检查I/O事件】和【processSelectedKeys执行逻辑】为前置知识的,如果遇到这里还会简单描述下,详细忘记的还请回去看。
在写此篇博客之前,写了一篇小总结,里面记录了如NioByteUnsafe和NioMessageUnsafe的区别等。
大致流程
检测新连接的大致流程如下:
- processSelectedKey(selectionkey, channel)
- NioMessageUnsafe.read()
- doReadMessage()
- javaChannel().accept()
从流程中不难看出,其实就是去监听accept事件。
实验代码
Server.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;
/**
* @author cwj
*/
public final class Server {
public static void main(String[] args) throws Exception {
// 两大线程
// 对应Socket中Server的启动监听线程(负责接收新连接并抛给workerGroup)
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 对应Socket中Client中主函数的线程(负责处理新连接)
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
// 辅助类
ServerBootstrap server = new ServerBootstrap();
// 装配线程
server.group(bossGroup, workerGroup)
// 设置Channel的类型
.channel(NioServerSocketChannel.class)
// 给每个客户端的连接设置一些tcp的基本属性
// .childOption(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.TCP_NODELAY, true)
// 服务端启动时做的逻辑
.handler(new ServerHandler())
// 给pipeline配置handler,Channel发生某种变化时对应的处理逻辑
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
//ch.pipeline().addLast(new AuthHandler());
}
});
// 辅助类,绑定端口,这里使用了同步,也就是说它原本是异步的:io.netty.bootstrap.AbstractBootstrap.doBind
ChannelFuture future = server.bind(8888).sync();
// 关闭
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
跟进源码
在跟进源码之前,先找到起点:
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
,如果忘记怎么进入这个方法的,可以回去看processSelectedKeys执行逻辑那一篇。
在processSelectedKey这个方法上打个断点:
在Server.java的new ServerBootstrap方法打个断点:
然后运行Server.java的main方法,首先进入第一个断点,即ServerBootstrap处的断点,记下bossGroup和workerGroup中NioEventLoop的@编号:
- bossGroup的是1129
- workerGroup的是1130
然后放开上一个断点,用telnet进行连接,之后就会进入下一个断点:
telnet 127.0.0.1 8888
进入断点效果如下:
理一理当前参数
对参数没兴趣就跳到下面的【继续跟进】。
在让断点继续玩下走之前,先理一理当前的一些参数、变量是什么,首先看看最外层:
- this是NioEventLoop@1129,即bossGroup中的NioEventLoop,验证了之前"parentGroup主要是处理新连接"的说法。
- 方法传入的AbstractNioChannel其实就是服务端channel,即NioServerSocketChannel。
再来看看传进来的SelectionKey和NioServerSocketChannel里面是什么东西,这里先看NioServerSocketChannel:
- NioServerSocketChannel传进来的主要作用,其实主要就是为了获取它的eventLoop和unsafe。
- 请记住这里的NioEventLoop@1129。
- 还记得上面的eventLoop是来自哪里的吗?不记得就看注册selector那篇。
下面再来看看传进来的selectionKey里面有什么:
- 关于selectionKey最原始的产生地,请看注册selector那一节。
- SelectionKey在processSelectedKey方法中主要发挥的作用就是:它保存了事件集。
有趣的事情来了,当我眼睛睁大点后,发现了如下片段:
- SelectionKey中的attachment实例居然和传进来的NioServerSocketChannel是同一个实例,虽然我在前面的博客中提到过attachment可以看做channel,但也没想到这里是同一个。
之后再打开attachment看看里面有什么:
- 找到核心的eventLoop,发现它根上面NioServerSocketChannel中的eventLoop是同一个。
- 这下更加确认attachment就等于processSelectedKey方法传进来的NioServerSocketChannel。
那按照上面的推论,processSelectedKey方法岂不是只要一个SelectionKey作为参数不就可以了?需要NioServerSocketChannel的时候调用attachment()方法获取不就可以了,为什么还需要再额外传NioServerSocketChannel?
- 第一问回答:我做了简单测试,确实可以。
- 由于在processSelectedKey方法中,NioServerSocketChannel只调用了eventLoop()和unsafe(),所以我就只测试了这两方法。(结果见下图)
- 从上图可以看出,最终返回的都是同一实例,所以理论上processSelectedKey方法应该只传SelectionKey就可以了。
- 第二问回到:我也不知道hhh。
到此,参数就暂时理这么多了,下面继续跟进源码。
继续跟进
在上面SelectionKey参数的截图中,我们看到当前需要处理的事件readyOps=16,即OP_ACCEPT。
所以,我们上面的断点,就达到if判断OP_ACCEPT的代码块中,即如下代码:
io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
断点跟进read方法,看到如下关键代码,此处【坐标1】:
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
final ChannelPipeline pipeline = pipeline();
// 控制当前获取连接数的东西
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
...(略)
// 出了这个do...while循环,就算获取完这一次轮询的新连接了
do {
// 新连接accept
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}
// 计数器+1
allocHandle.incMessagesRead(localRead);
// 下面的continueReading就是判断当前读的新连接数量是否超过上限了(默认一次轮询16个)
} while (allocHandle.continueReading());
...(略)
for (int i = 0; i < size; i ++) {
readPending = false;
// 这个方法将在下下篇博客讲【新连接分配NioEventLoop、注册selector】时核心讲解
pipeline.fireChannelRead(readBuf.get(i));
}
...(略)
- 请看代码上的注释。
断点继续跟进doReadMessages方法:
io.netty.channel.socket.nio.NioServerSocketChannel#doReadMessages
@Override
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();
try {
if (ch != null) {
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
- 可以看到代码第一行,就是通过jdk底层的ServerSocketChannelImpl调用accept方法获取新连接的。关于javaChannel()获取到什么,可以看创建服务端channel这篇。
- 获取到的SocketChannel和this(NioServerSocketChannel)会被封装成NioSocketChannel,关于NioSocketChannel下一节就分析。
- 最终,新连接添加到buf成功后,就返回。
返回之后的事情,在【坐标1】的代码注释中也有解释了,这里就不再赘述。到此,检测+获取新连接的操作也就完成了。
小结
- 服务端检测新连接其实就是靠NioEventLoop中的run方法轮询实现的,只不过在端口绑定完成之前,就算轮询到accept事件,processSelectionKeys方法也不会去处理而已。
- 获取到新连接的大致流程就是:
- NioEventLoop的run方法一直轮询。
- select方法检测出I/O事件。
- processSelectionKeys决定要处理哪些事件,如OP_ACCEPT等,要处理的事件可由事件传播改变。
- 最终调用jdk底层的channel的accept方法得到新连接(之后还会对新连接做各种绑定、注册等处理)。
如果对NioEventLoop的select、processSelectionKeys等方法感到陌生,建议回去看【NioEventLoop的启动】的相关博客。