回顾
如果忘记是怎么进入一下方法的,请回去看newChild章节
io.netty.channel.nio.NioEventLoop#run
...
// 检查I/O事件
select(wakenUp.getAndSet(false));
...
// 处理上面select查到的I/O事件
processSelectedKeys();
...
// 运行上面处理的事件集
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
...
前两节记录了select方法和processSelectedKeys方法大致的执行逻辑,但这两个方法跟本节的runAllTasks方法关系不大,倒是跟第一节【启动前概览】有那么一下关系,如果不记得最好去回忆下。
Netty Version:4.1.6
任务存在哪
普通任务
runAllTasks顾名思义就是处理任务队列中的任务,而在讲select方法那一节,又提到除了普通任务之外,还有定时任务。那么下面就先补全一些之前没讲的东西。
首先,视角切回io.netty.util.concurrent.SingleThreadEventExecutor#execute
,如果忘记怎么进来的,可以回去newChild那一节回顾:
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
startThread();
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}
if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}
进入addTask方法:
io.netty.util.concurrent.SingleThreadEventExecutor#addTask
protected void addTask(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
if (!offerTask(task)) {
reject(task);
}
}
再进入offerTask方法:
final boolean offerTask(Runnable task) {
if (isShutdown()) {
reject();
}
return taskQueue.offer(task);
}
- 这里的taskQueue(mpscQueue)也在newChild那一节讲过。
- 而offer方法就是将任务添加到taskQueue中,taskQueue用于存储普通任务。
定时任务
Netty提供了一些添加定时任务的接口,它就是NioEventLoop的父类AbstractScheduledEventExecutor的schedule方法,挑一个来看看(其它重载底层队列都一样):
io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(java.util.concurrent.Callable
@Override
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
ObjectUtil.checkNotNull(callable, "callable");
ObjectUtil.checkNotNull(unit, "unit");
if (delay < 0) {
throw new IllegalArgumentException(
String.format("delay: %d (expected: >= 0)", delay));
}
return schedule(new ScheduledFutureTask<V>(
this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
- ScheduledFutureTask就是定时任务队列中元素的类型,在前面讲select方法时讲到了,忘记的可以回去看看。
然后,再进入schedule方法:
io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
if (inEventLoop()) {
scheduledTaskQueue().add(task);
} else {
execute(new Runnable() {
@Override
public void run() {
scheduledTaskQueue().add(task);
}
});
}
return task;
}
- inEventLoop()在第一节【启动前概览】中讲过。
- execute方法目的就是保证线程安全(如果不是NioEventLoop的线程),在【ThreadPerTaskExcutor】中讲过。
- 不管怎么样,最终任务被添加到
PriorityQueue<ScheduledFutureTask
中了。>()
以上,我们已经知道在runAllTasks之前,普通任务被存储在mpscQueue中,而定时任务则被存储在PriorityQueue中。下面让我们回到主线。
追踪runAllTasks方法
首先,将视角转移回io.netty.channel.nio.NioEventLoop#run
方法中(忘记看上面回顾),找到下面一段代码:
io.netty.channel.nio.NioEventLoop#run
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
runAllTasks();
}
} else {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
}
- ioRatio默认值为50,所以会进入else代码块。
- ioTime是执行processSelectedKeys();所花的时间。
- runAllTasks(ioTime * (100 - ioRatio) / ioRatio);代码含义:让处理I/O事件的时间和执行任务的时间为1:1。
- 这也是为什么前面select检测I/O事件时,如果发现有定时任务距离执行的时间<0.5ms就会变非阻塞方法。
进入runAllTasks方法,此处【坐标1】:
io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)
protected boolean runAllTasks(long timeoutNanos) {
// 任务聚合
fetchFromScheduledTaskQueue();
// 取出一个任务
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
进入fetchFromScheduledTaskQueue方法,这是一个聚合任务的方法:
io.netty.util.concurrent.SingleThreadEventExecutor#fetchFromScheduledTaskQueue
private boolean fetchFromScheduledTaskQueue() {
long nanoTime = AbstractScheduledEventExecutor.nanoTime();
Runnable scheduledTask = pollScheduledTask(nanoTime);
while (scheduledTask != null) {
if (!taskQueue.offer(scheduledTask)) {
// No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
return false;
}
scheduledTask = pollScheduledTask(nanoTime);
}
return true;
}
代码简单分析:
- nanoTime可以看做是一个截止日期时间。
- pollScheduledTask(nanoTime);就是在定时任务队列中取出一个离截止日期事件最近的定时任务,取出后从定时任务队列中删除,如果没有则返回null。(不要忘了定时任务队列是按照截止时间排序的,在select方法章节讲过。)
- taskQueue.offer(scheduledTask):尝试将取出的定时任务添加到普通任务队列taskQueue(mpscQueue)中,如果taskQueue空间足够,则添加成功返回true,否则添加失败返回fasle,返回前在if的代码块中将任务归还到定时任务队列。
经过了简单分析fetchFromScheduledTaskQueue方法后,我们知道当执行完这个方法后,所有待处理的普通任务、即将执行的定时任务,都会放到同一个队列中,即mpscQueue(TaskQueue)中。
现在将视角重新切回【坐标1】,继续往下看(下面不复读代码了,为了方便,请在编译器中打开代码):
io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)
- deadline:计算出来的截止时间,即for循环任务只能执行到deadline时间。
- runTasks执行任务的计数器,每执行一个任务就自增1.
- safeExecute(task):串行(线程安全)执行任务。
- if ((runTasks & 0x3F) == 0):当执行了64次任务,则继续往下走进入if代码块。在if代码块中,就是判断是否到达了截止时间,如果当前时间>=deadline,则break结束for循环任务执行。
- afterRunningAllTasks():执行一些收尾性质的任务。
至此,runAllTasks方法的大致逻辑理完了。
小结
- runAllTasks方法执行时,首先会进行任务聚合。
- 任务聚合的方式其实就是:取即将要执行的定时任务,然后添加到普通任务的队列(mpscQueue)中。
- 在处理任务队列时,任务是逐个执行的。
- runAllTasks执行任务也是有时限的,runAllTasks方法执行任务的时间,默认情况下是和前面processSelectedKeys处理I/O事件的时间相等。
- runAllTasks运行期间,每执行64次任务才检查是否超过了截止时间,之所以这样做是因为检查时间操作比较耗时,64次这个次数是Netty根据经验硬编码的。