回顾

如果忘记是怎么进入一下方法的,请回去看newChild章节

io.netty.channel.nio.NioEventLoop#run

...
// 检查I/O事件
select(wakenUp.getAndSet(false));
...
// 处理上面select查到的I/O事件
processSelectedKeys();
...
// 运行上面处理的事件集
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
...

前两节记录了select方法和processSelectedKeys方法大致的执行逻辑,但这两个方法跟本节的runAllTasks方法关系不大,倒是跟第一节【启动前概览】有那么一下关系,如果不记得最好去回忆下。

Netty Version:4.1.6


任务存在哪

普通任务

runAllTasks顾名思义就是处理任务队列中的任务,而在讲select方法那一节,又提到除了普通任务之外,还有定时任务。那么下面就先补全一些之前没讲的东西。

首先,视角切回io.netty.util.concurrent.SingleThreadEventExecutor#execute,如果忘记怎么进来的,可以回去newChild那一节回顾:

    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }

进入addTask方法:
io.netty.util.concurrent.SingleThreadEventExecutor#addTask

    protected void addTask(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (!offerTask(task)) {
            reject(task);
        }
    }

再进入offerTask方法:

    final boolean offerTask(Runnable task) {
        if (isShutdown()) {
            reject();
        }
        return taskQueue.offer(task);
    }
  • 这里的taskQueue(mpscQueue)也在newChild那一节讲过。
  • 而offer方法就是将任务添加到taskQueue中,taskQueue用于存储普通任务

定时任务

Netty提供了一些添加定时任务的接口,它就是NioEventLoop的父类AbstractScheduledEventExecutor的schedule方法,挑一个来看看(其它重载底层队列都一样):
io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(java.util.concurrent.Callable, long, java.util.concurrent.TimeUnit)

    @Override
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
        ObjectUtil.checkNotNull(callable, "callable");
        ObjectUtil.checkNotNull(unit, "unit");
        if (delay < 0) {
            throw new IllegalArgumentException(
                    String.format("delay: %d (expected: >= 0)", delay));
        }
        return schedule(new ScheduledFutureTask<V>(
                this, callable, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
    }
  • ScheduledFutureTask就是定时任务队列中元素的类型,在前面讲select方法时讲到了,忘记的可以回去看看。

然后,再进入schedule方法:
io.netty.util.concurrent.AbstractScheduledEventExecutor#schedule(io.netty.util.concurrent.ScheduledFutureTask)

    <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
        if (inEventLoop()) {
            scheduledTaskQueue().add(task);
        } else {
            execute(new Runnable() {
                @Override
                public void run() {
                    scheduledTaskQueue().add(task);
                }
            });
        }

        return task;
    }
  • inEventLoop()在第一节【启动前概览】中讲过。
  • execute方法目的就是保证线程安全(如果不是NioEventLoop的线程),在【ThreadPerTaskExcutor】中讲过。
  • 不管怎么样,最终任务被添加到PriorityQueue<ScheduledFutureTask>()中了。

以上,我们已经知道在runAllTasks之前,普通任务被存储在mpscQueue中,而定时任务则被存储在PriorityQueue中。下面让我们回到主线。


追踪runAllTasks方法

首先,将视角转移回io.netty.channel.nio.NioEventLoop#run方法中(忘记看上面回顾),找到下面一段代码:
io.netty.channel.nio.NioEventLoop#run

                final int ioRatio = this.ioRatio;
                if (ioRatio == 100) {
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        runAllTasks();
                    }
                } else {
                    final long ioStartTime = System.nanoTime();
                    try {
                        processSelectedKeys();
                    } finally {
                        // Ensure we always run tasks.
                        final long ioTime = System.nanoTime() - ioStartTime;
                        runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
                    }
                }
  • ioRatio默认值为50,所以会进入else代码块。
  • ioTime是执行processSelectedKeys();所花的时间。
  • runAllTasks(ioTime * (100 - ioRatio) / ioRatio);代码含义:让处理I/O事件的时间和执行任务的时间为1:1。
  • 这也是为什么前面select检测I/O事件时,如果发现有定时任务距离执行的时间<0.5ms就会变非阻塞方法。

进入runAllTasks方法,此处【坐标1】:
io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)

    protected boolean runAllTasks(long timeoutNanos) {
        // 任务聚合
        fetchFromScheduledTaskQueue();
        // 取出一个任务
        Runnable task = pollTask();
        if (task == null) {
            afterRunningAllTasks();
            return false;
        }

        final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
        long runTasks = 0;
        long lastExecutionTime;
        for (;;) {
            safeExecute(task);

            runTasks ++;

            // Check timeout every 64 tasks because nanoTime() is relatively expensive.
            // XXX: Hard-coded value - will make it configurable if it is really a problem.
            if ((runTasks & 0x3F) == 0) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                if (lastExecutionTime >= deadline) {
                    break;
                }
            }

            task = pollTask();
            if (task == null) {
                lastExecutionTime = ScheduledFutureTask.nanoTime();
                break;
            }
        }

        afterRunningAllTasks();
        this.lastExecutionTime = lastExecutionTime;
        return true;
    }

进入fetchFromScheduledTaskQueue方法,这是一个聚合任务的方法
io.netty.util.concurrent.SingleThreadEventExecutor#fetchFromScheduledTaskQueue

    private boolean fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        Runnable scheduledTask  = pollScheduledTask(nanoTime);
        while (scheduledTask != null) {
            if (!taskQueue.offer(scheduledTask)) {
                // No space left in the task queue add it back to the scheduledTaskQueue so we pick it up again.
                scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
            scheduledTask  = pollScheduledTask(nanoTime);
        }
        return true;
    }

代码简单分析:

  • nanoTime可以看做是一个截止日期时间。
  • pollScheduledTask(nanoTime);就是在定时任务队列中取出一个离截止日期事件最近的定时任务,取出后从定时任务队列中删除,如果没有则返回null。(不要忘了定时任务队列是按照截止时间排序的,在select方法章节讲过。)
  • taskQueue.offer(scheduledTask):尝试将取出的定时任务添加到普通任务队列taskQueue(mpscQueue)中,如果taskQueue空间足够,则添加成功返回true,否则添加失败返回fasle,返回前在if的代码块中将任务归还到定时任务队列。

经过了简单分析fetchFromScheduledTaskQueue方法后,我们知道当执行完这个方法后,所有待处理的普通任务、即将执行的定时任务,都会放到同一个队列中,即mpscQueue(TaskQueue)中。

现在将视角重新切回【坐标1】,继续往下看(下面不复读代码了,为了方便,请在编译器中打开代码):
io.netty.util.concurrent.SingleThreadEventExecutor#runAllTasks(long)

  • deadline:计算出来的截止时间,即for循环任务只能执行到deadline时间。
  • runTasks执行任务的计数器,每执行一个任务就自增1.
  • safeExecute(task):串行(线程安全)执行任务。
  • if ((runTasks & 0x3F) == 0):当执行了64次任务,则继续往下走进入if代码块。在if代码块中,就是判断是否到达了截止时间,如果当前时间>=deadline,则break结束for循环任务执行。
  • afterRunningAllTasks():执行一些收尾性质的任务。

至此,runAllTasks方法的大致逻辑理完了。


小结

  • runAllTasks方法执行时,首先会进行任务聚合
  • 任务聚合的方式其实就是:取即将要执行的定时任务,然后添加到普通任务的队列(mpscQueue)中。
  • 在处理任务队列时,任务是逐个执行的。
  • runAllTasks执行任务也是有时限的,runAllTasks方法执行任务的时间,默认情况下是和前面processSelectedKeys处理I/O事件的时间相等。
  • runAllTasks运行期间,每执行64次任务才检查是否超过了截止时间,之所以这样做是因为检查时间操作比较耗时,64次这个次数是Netty根据经验硬编码的。