# **回顾**
如果忘记是怎么进入一下方法的,请回去看[newChild](https://wenjie.store/archives/netty-nioeventloop-build-3)章节
<code>io.netty.channel.nio.NioEventLoop#run</code>
```java
...
// 检查I/O事件
select(wakenUp.getAndSet(false));
...
// 处理上面select查到的I/O事件
processSelectedKeys();
...
// 运行上面处理的事件集
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
...
```
前两节记录了select方法和processSelectedKeys方法大致的执行逻辑,但这两个方法跟本节的runAllTasks方法关系不大,倒是跟第一节【[启动前概览](https://wenjie.store/archives/netty-nioeventloop-boot-1)】有那么一下关系,如果不记得最好去回忆下。
<blockquote><p>
Netty Version:4.1.6
</p></blockquote>
<br/>
# **任务存在哪**
### **普通任务**
runAllTasks顾名思义就是处理任务队列中的任务,而在讲[select方法](https://wenjie.store/archives/netty-nioeventloop-boot-2)那一节,又提到除了普通任务之外,还有定时任务。那么下面就先补全一些之前没讲的东西。
首先,视角切回<code>io.netty.util.concurrent.SingleThreadEventExecutor#execute</code>,如果忘记怎么进来的,可以回去newChild那一节回顾:
```java
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
```
进入addTask方法:
<code>io.netty.util.concurrent.SingleThreadEventExecutor#addTask</code>
```java
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
```
再进入offerTask方法:
```java
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
```
- 这里的taskQueue(mpscQueue)也在[newChild](https://wenjie.store/archives/netty-nioeventloop-build-3)那一节讲过。
- 而offer方法就是将任务添加到taskQueue中,**taskQueue用于存储普通任务**。
<br/>
### **定时任务**
Netty提供了一些添加定时任务的接口,它就是NioEventLoop的父类AbstractScheduledEventExecutor的schedule方法,挑一个来看看(其它重载底层队列都一样):
<code>io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(java.util.concurrent.Callable<V>, long, java.util.concurrent.TimeUnit)</code>
```java
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(callable, "callable");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<V>(
this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
```
- ScheduledFutureTask就是定时任务队列中元素的类型,在前面讲[select方法](https://wenjie.store/archives/netty-nioeventloop-boot-2)时讲到了,忘记的可以回去看看。
然后,再进入schedule方法:
<code>io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask<V>)</code>
```java
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
```
- inEventLoop()在第一节【[启动前概览](https://wenjie.store/archives/netty-nioeventloop-boot-1)】中讲过。
- execute方法目的就是保证线程安全(如果不是NioEventLoop的线程),在【[ThreadPerTaskExcutor](https://wenjie.store/archives/netty-nioeventloop-build-2)】中讲过。
- 不管怎么样,最终任务被添加到<code>PriorityQueue<ScheduledFutureTask<V>>()</code>中了。
<br/>
以上,我们已经知道在runAllTasks之前,普通任务被存储在mpscQueue中,而定时任务则被存储在PriorityQueue中。下面让我们回到主线。
<br/>
# **追踪runAllTasks方法**
首先,将视角转移回<code>io.netty.channel.nio.NioEventLoop#run</code>方法中(忘记看上面回顾),找到下面一段代码:
<code>io.netty.channel.nio.NioEventLoop#run</code>
```java
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
```
- ioRatio默认值为50,所以会进入else代码块。
- ioTime是执行processSelectedKeys();所花的时间。
- runAllTasks(ioTime * (100 - ioRatio) / ioRatio);代码含义:让处理I/O事件的时间和执行任务的时间为1:1。
- 这也是为什么[前面](https://wenjie.store/archives/netty-nioeventloop-boot-2)select检测I/O事件时,如果发现有定时任务距离执行的时间<0.5ms就会变非阻塞方法。
进入runAllTasks方法,此处【坐标1】:
<code>io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)</code>
```java
protected boolean runAllTasks(long timeoutNanos) {
// 任务聚合
fetchFromScheduledTaskQueue();
// 取出一个任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
```
进入fetchFromScheduledTaskQueue方法,这是一个**聚合任务的方法**:
<code>io.netty.util.concurrent.SingleThreadEventExecutor#fetchFromScheduledTaskQueue</code>
```java
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
```
代码简单分析:
- nanoTime可以看做是一个截止日期时间。
- pollScheduledTask(nanoTime);就是在定时任务队列中取出一个离截止日期事件最近的定时任务,取出后从定时任务队列中删除,如果没有则返回null。(不要忘了定时任务队列是按照截止时间排序的,在[select方法章节](https://wenjie.store/archives/netty-nioeventloop-boot-2)讲过。)
- taskQueue.offer(scheduledTask):**尝试将取出的定时任务添加到普通任务队列taskQueue(mpscQueue)中**,如果taskQueue空间足够,则添加成功返回true,否则添加失败返回fasle,返回前在if的代码块中将任务归还到定时任务队列。
经过了简单分析fetchFromScheduledTaskQueue方法后,我们知道**当执行完这个方法后,所有待处理的普通任务、即将执行的定时任务,都会放到同一个队列中,即mpscQueue(TaskQueue)中。**
现在将视角重新切回【坐标1】,继续往下看(下面不复读代码了,为了方便,请在编译器中打开代码):
<code>io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)</code>
- deadline:计算出来的截止时间,即for循环任务只能执行到deadline时间。
- runTasks执行任务的计数器,每执行一个任务就自增1.
- safeExecute(task):串行(线程安全)执行任务。
- if ((runTasks & 0x3F) == 0):当执行了64次任务,则继续往下走进入if代码块。在if代码块中,就是判断是否到达了截止时间,如果当前时间>=deadline,则break结束for循环任务执行。
- afterRunningAllTasks():执行一些收尾性质的任务。
<br/>
至此,runAllTasks方法的大致逻辑理完了。
---
# **小结**
- runAllTasks方法执行时,首先会进行**任务聚合**。
- 任务聚合的方式其实就是:取即将要执行的定时任务,然后添加到普通任务的队列(mpscQueue)中。
- 在处理任务队列时,任务是逐个执行的。
- runAllTasks执行任务也是有时限的,runAllTasks方法执行任务的时间,默认情况下是和前面processSelectedKeys处理I/O事件的时间相等。
- runAllTasks运行期间,每执行64次任务才检查是否超过了截止时间,之所以这样做是因为检查时间操作比较耗时,64次这个次数是Netty根据经验硬编码的。

【Netty】NioEventLoop的启动(四):runAllTasks