回顾
在上一节,记录了NioEventLoop启动前做的一些事情,并最终找到一个方法run,如果不记得可以回上一节看看,因为这个run方法是本篇以及相关章节的入口。
这里再贴下run方法里面的三个核心方法:
io.netty.channel.nio.NioEventLoop#run
...
// 检查I/O事件
select(wakenUp.getAndSet(false));
...
// 处理上面select查到的I/O事件
processSelectedKeys();
...
// 运行上面处理的事件集
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
...
本节要追踪的就是select方法,除了正常逻辑以外,还可以看到Netty是如何解决jdk空轮询bug。
一些我的口头名词解释:select操作和检测I/O事件是同一个意思。
Netty Version
一些没讲过但是要先知道的前提
Netty中除了普通的任务之外,还会有一些定时任务,而这些定时任务,在执行之前实际上是存储在一个定时任务队列中,这个队列里的元素是按照截止时间排序的(本节会讲到这一点)。
另外,在本节之前的篇章中提到的任务队列都不是定时任务队列,在这一节会简单看一下这个任务队列。
开始追踪select方法
首先将视角切回io.netty.channel.nio.NioEventLoop#run
,这是起点,如果忘记怎么过来的请看回顾,下面直接开始。
首先找到这一段代码:
select(wakenUp.getAndSet(false));
- 含义:执行select方法,并在执行之前,都标识一个状态,表示当前要进行select操作且处于未唤醒状态。
进入select方法,由于代码很长,下面分段贴,此处【坐标1】:
io.netty.channel.nio.NioEventLoop#select
Selector selector = this.selector;
try {
int selectCnt = 0;
long currentTimeNanos = System.nanoTime();
long selectDeadLineNanos = currentTimeNanos + delayNanos(currentTimeNanos);
for (;;) {
long timeoutMillis = (selectDeadLineNanos - currentTimeNanos + 500000L) / 1000000L;
if (timeoutMillis <= 0) {
if (selectCnt == 0) {
selector.selectNow();
selectCnt = 1;
}
break;
}
...(略)
- selector就不多说了,在
- selectCnt大概是一个记录是否有进行select操作的计数器。
- currentTimeNanos是当前时间(单位纳秒)。
- delayNanos(currentTimeNanos)返回的就是当前时间距离下次定时任务的所剩时间(单位纳秒)。
- selectDeadLineNanos就是下次定时任务的开始时间(单位纳秒)。
- timeoutMillis(单位毫秒)的含义则需要联系下面的if代码块,只有当下个定时任务开始距离当前时间>=0.5ms,才能继续往下走,否则表示即将有定时任务要执行,之后会调用一个非阻塞的selectNow()
关于上面提到的非阻塞,其实看完源码后仍然我不太理解,因为我追进源码最终还是看到synchronized,也许是指这里的synchronized并未上升到重量级锁。
这里再贴下文档原话,我英语再塑料,也不至于翻译错非阻塞吧:This method performs a non-blocking selection operation。
这里先不往下看,进入delayNanos方法看看:
io.netty.util.concurrent.SingleThreadEventExecutor#delayNanos
protected long delayNanos(long currentTimeNanos) {
ScheduledFutureTask<?> scheduledTask = peekScheduledTask();
if (scheduledTask == null) {
return SCHEDULE_PURGE_INTERVAL;
}
return scheduledTask.delayNanos(currentTimeNanos);
}
- 该方法返回的是当前时间距离最近一个定时任务开始的所剩时间。
- peekScheduledTask()就是返回一个最近的定时任务。
再看看ScheduledFutureTask这个类,找到它的compareTo方法:
io.netty.util.concurrent.ScheduledFutureTask#compareTo
public int compareTo(Delayed o) {
if (this == o) {
return 0;
}
ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
long d = deadlineNanos() - that.deadlineNanos();
if (d < 0) {
return -1;
} else if (d > 0) {
return 1;
} else if (id < that.id) {
return -1;
} else if (id == that.id) {
throw new Error();
} else {
return 1;
}
}
- 可以看到,首先按截止时间排序,当截止时间相同时,再根据任务id排序。
解读了一些时间参数的含义、验证了定时任务是按照截止时间排序后,再次将视角转回io.netty.channel.nio.NioEventLoop#select
,继续来看看【坐标1】还没贴出的代码:
io.netty.channel.nio.NioEventLoop#select
...(略)
// If a task was submitted when wakenUp value was true, the task didn't get a chance to call
// Selector#wakeup. So we need to check task queue again before executing select operation.
// If we don't, the task might be pended until select operation was timed out.
// It might be pended until idle timeout if IdleStateHandler existed in pipeline.
if (hasTasks() && wakenUp.compareAndSet(false, true)) {
selector.selectNow();
selectCnt = 1;
break;
}
int selectedKeys = selector.select(timeoutMillis);
selectCnt ++;
...(略)
- 第一个if先判断任务队列中是否有任务、将线程设置为唤醒状态是否成功,如果队列中有任务且线程状态更新成功,则调用非阻塞的selectNow,否则继续往下走。
- selector.select(timeoutMillis);则是以阻塞方式执行检测I/O事件的操作。
继续往下看代码:
io.netty.channel.nio.NioEventLoop#select
...(略)
if (selectedKeys != 0 || oldWakenUp || wakenUp.get() || hasTasks() || hasScheduledTasks()) {
// - Selected something,
// - waken up by user, or
// - the task queue has a pending task.
// - a scheduled task is ready for processing
break;
}
...(略)
- 含义:如果前面执行过一次同步select 或 oldWakenUp为true 或 线程是唤醒状态 或 普通任务队列里有任务 或 定时任务队列里有任务,都会结束这次for循环。因为满足了前面任务一个条件,都说明已经进行过一次检测I/O事件了,无需再进行。
中间还有个判断线程中断的就跳过了,基本没啥需要解释的,下面来看看一段比较关键的,也正是这段解决了jdk空轮询的bug:
io.netty.channel.nio.NioEventLoop#select
...(略)
long time = System.nanoTime();
if (time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos) {
// timeoutMillis elapsed without anything selected.
selectCnt = 1;
} else if (SELECTOR_AUTO_REBUILD_THRESHOLD > 0 &&
selectCnt >= SELECTOR_AUTO_REBUILD_THRESHOLD) {
// The selector returned prematurely many times in a row.
// Rebuild the selector to work around the problem.
logger.warn(
"Selector.select() returned prematurely {} times in a row; rebuilding Selector {}.",
selectCnt, selector);
rebuildSelector();
selector = this.selector;
// Select again to populate selectedKeys.
selector.selectNow();
selectCnt = 1;
break;
}
currentTimeNanos = time;
}//for循环结束
...(略)
- time是执行完select后的时间,而currentTimeNanos则是select之前的时间。
- time - TimeUnit.MILLISECONDS.toNanos(timeoutMillis) >= currentTimeNanos这段代码含义:
- select后时间(纳秒)-换算成纳秒的timeoutMillis>=select开始前时间(纳秒),则说明已经执行过一次阻塞式select了,计数器=1,这里没有break但之后有, 因为下次再进入for是时间值和计数器都”不符合if”,最终break。
**如果执行代码时,到达了上面ifelse的前一行,但却没有进入if或else if,就说明发生了空轮询。**只是空轮询次数低于SELECTOR_AUTO_REBUILD_THRESHOLD(默认512)时在不断重试。
解决空轮询bug
再分析一下Netty是如何判断空轮询的:
- 其实就是将上面if的时间公式反过来想:select操作后时间-timeoutMillis < select操作前时间。
- 当满足上面这个条件,就说明:selector.select(timeoutMillis);这个阻塞方法并没有阻塞就直接返回了,即发生了空轮询。
而Netty则是靠rebuildSelector();这个方法去解决空轮询bug的,不妨跟进去看看(代码很长,但逻辑还是很简单):
io.netty.channel.nio.NioEventLoop#rebuildSelector
public void rebuildSelector() {
if (!inEventLoop()) {
// 保证线程安全
execute(new Runnable() {
@Override
public void run() {
rebuildSelector();
}
});
return;
}
final Selector oldSelector = selector;
final Selector newSelector;
if (oldSelector == null) {
return;
}
try {
newSelector = openSelector();
} catch (Exception e) {
logger.warn("Failed to create a new Selector.", e);
return;
}
// Register all channels to the new Selector.
int nChannels = 0;
for (;;) {
try {
for (SelectionKey key: oldSelector.keys()) {
// 其实就是Channel
Object a = key.attachment();
try {
if (!key.isValid() || key.channel().keyFor(newSelector) != null) {
continue;
}
int interestOps = key.interestOps();
key.cancel();
SelectionKey newKey = key.channel().register(newSelector, interestOps, a);
if (a instanceof AbstractNioChannel) {
// Update SelectionKey
((AbstractNioChannel) a).selectionKey = newKey;
}
nChannels ++;
} catch (Exception e) {
logger.warn("Failed to re-register a Channel to the new Selector.", e);
if (a instanceof AbstractNioChannel) {
AbstractNioChannel ch = (AbstractNioChannel) a;
ch.unsafe().close(ch.unsafe().voidPromise());
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
invokeChannelUnregistered(task, key, e);
}
}
}
} catch (ConcurrentModificationException e) {
// Probably due to concurrent modification of the key set.
continue;
}
break;
}
selector = newSelector;
try {
// time to close the old selector as everything else is registered to the new one
oldSelector.close();
} catch (Throwable t) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to close the old Selector.", t);
}
}
logger.info("Migrated " + nChannels + " channel(s) to the new Selector.");
}
Netty解决空轮询bug的手法看上去也很"暴力",就是重建一个新的selector,并把旧selector上的selectedKeys全部复制到新的selector上,再用新的selector替换旧的selector。之后再尝试select操作就很可能不会再发生空轮询bug了。
关于selectedKey,在这篇有提到过。
关于attchment取出的Object就是Chaneel这个说法,不记得的可以看这篇。
小结
- select方法在检测I/O事件时,有两种策略,分别是阻塞式和非阻塞式。
- 在进行select操作时,有可能出现空轮询bug,而当空轮询达到一个阈值时(默认512),就会重建seletor。
- 重建selector其实就是创建新的selector,并将旧selector上的selectedKeys复制到新的selector上,最后用新selector替换旧selector,重新尝试select操作就可能解决空轮询bug了。