前言
在前一段时间,我学习了NioEventLoop(Group)创建的大致流程,并将一些笔记记录成我的博客。
那么这次,就继承前面所学的Netty知识,来继续学习NioEventLoop的启动大致流程。
Netty Version:4.1.6
打算拆分的章节:
- 启动前的一些准备(本篇)
- select()
- processSelectedKeysOptimized()
- runAllTask()
大致流程
大致解释一下:
- 入口把任务交给execute方法执行。
- execute时,会判断当前线程是不是NioEventLoop的线程,如果不是就会尝试创建线程。
- 线程的创建是由TreadPerTaskExecutor的execute方法创建的,即FastThreadLocalThread(前面一篇中讲过TreadPerTaskExecutor)
- 保存线程,如果线程不是NioEventLoop的,则会分配到taskQueue(mpscQueue)中执行,保证线程安全。(mpscQueue也在前面一篇中讲过)
- 启动的核心函数,后面分小节记录。
这篇博客的一些"常识",在我之前的博客中也有记录,为了以防万一不记得,我会贴下传送门,方便回忆。
源码跟进
在跟进源码之前,我得先声明下起点:io.netty.bootstrap.AbstractBootstrap#doBind0
,如果忘记怎么进入这个方法的,可以看端口绑定这一节。
凭借着一点仅存的肌肉记忆,我点进了起点,下面贴下代码:
io.netty.bootstrap.AbstractBootstrap#doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
进入execute方法(实现类:SingleThreadEventExcutor),此处【坐标1】:
io.netty.util.concurrent.SingleThreadEventExecutor#execute
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
- inEventLoop判断添加任务操作是否是当前NioEventLoop的线程发起的,如果不是,为了保证线程安全性会进行一些额外操作。
- 关于addTask方法,后面会再解析。
进入inEventLoop方法看看(实现类:SingleThreadEventExcutor):
@Override
public boolean inEventLoop() {
return inEventLoop(Thread.currentThread());
}
- 传入参数为当前线程。
继续进入inEventLoop方法(实现类:SingleThreadEventExcutor):
io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop
@Override
public boolean inEventLoop(Thread thread) {
return thread == this.thread;
}
- 说明inEventLoop()方法最终就是在判断当前线程是否是NioEventLoop的线程(类型为:FastThreadLocalThread)
不记得FastThreadLocalThread的,可以去这篇看看,有简单提到。
视角切回上面【坐标1】的代码,可以看出,如果执行execute进行添加任务的线程是来自当前NioEventLoop线程的,则会直接执行addTask方法添加任务,如果不是,则会先执行startThread方法。
下面就先进入这个startThread方法看看:
io.netty.util.concurrent.SingleThreadEventExecutor#startThread
private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
doStartThread();
}
}
}
- 第一个if是在判断当前线程是否ST_NOT_STARTED状态,如果是ST_NOT_STARTED状态则继续往下
- 第二个if是一个CAS操作,将当前线程的ST_NOT_STARTED状态修改为ST_STARTED状态,成功则继续玩下
- 最后来到一个doStartThread方法
进入doStartThread方法,代码比较长,仅贴出其中比较重要的:
io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread
private void doStartThread() {
assert thread == null;
executor.execute(new Runnable() {
@Override
public void run() {
thread = Thread.currentThread();
if (interrupted) {
thread.interrupt();
}
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
...(略)
- 这里的executor和它的execute方法就是在前面ThreadPerTaskExcutor中讲到的,忘记可以回去看看。
- 这段代码保存了当前的线程,即NioEventLoop的线程。
- 同时,在上面这段代码,也看到了后面要拆开讲的核心代码:SingleThreadEventExecutor.this.run()
对于上面SingleThreadEventExecutor.this.run()的run方法,这篇虽然不会详细讲,但大致看看还是可以的,下面先给出一个大致流程留个印象。
SingThreadEventExecutor.this.run的大致流程:
- 无限for循环。
- select方法检测I/O事件
- 处理select轮询出来的I/O事件
- 处理异步任务
进入run方法
话不多说,我们进入run方法(实现类:NioEventLoop),由于代码过长,我暂时先贴"三主角":
io.netty.channel.nio.NioEventLoop#run
...
// 检查I/O事件
select(wakenUp.getAndSet(false));
...
// 处理上面select查到的I/O事件
processSelectedKeys();
...
// 运行上面处理的事件集
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
...
上面三个方法就是后面三个小节各自介绍的内容。
小结
本小节的内容不多,只是追了比较简单的代码,但是跟前面学到的知识点有不少联系,希望能把一些之前的一些知识点串联起来再继续前进。