前言

在前一段时间,我学习了NioEventLoop(Group)创建的大致流程,并将一些笔记记录成我的博客

那么这次,就继承前面所学的Netty知识,来继续学习NioEventLoop的启动大致流程。

Netty Version:4.1.6


打算拆分的章节:
  • 启动前的一些准备(本篇)
  • select()
  • processSelectedKeysOptimized()
  • runAllTask()

大致流程

启动的大致流程.png

大致解释一下:

  • 入口把任务交给execute方法执行。
  • execute时,会判断当前线程是不是NioEventLoop的线程,如果不是就会尝试创建线程。
  • 线程的创建是由TreadPerTaskExecutor的execute方法创建的,即FastThreadLocalThread(前面一篇中讲过TreadPerTaskExecutor)
  • 保存线程,如果线程不是NioEventLoop的,则会分配到taskQueue(mpscQueue)中执行,保证线程安全。(mpscQueue也在前面一篇中讲过)
  • 启动的核心函数,后面分小节记录。

这篇博客的一些"常识",在我之前的博客中也有记录,为了以防万一不记得,我会贴下传送门,方便回忆。

源码跟进

在跟进源码之前,我得先声明下起点:io.netty.bootstrap.AbstractBootstrap#doBind0,如果忘记怎么进入这个方法的,可以看端口绑定这一节

凭借着一点仅存的肌肉记忆,我点进了起点,下面贴下代码:
io.netty.bootstrap.AbstractBootstrap#doBind0

    private static void doBind0(
            final ChannelFuture regFuture, final Channel channel,
            final SocketAddress localAddress, final ChannelPromise promise) {

        // This method is invoked before channelRegistered() is triggered.  Give user handlers a chance to set up
        // the pipeline in its channelRegistered() implementation.
        channel.eventLoop().execute(new Runnable() {
            @Override
            public void run() {
                if (regFuture.isSuccess()) {
                    channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
                } else {
                    promise.setFailure(regFuture.cause());
                }
            }
        });
    }

进入execute方法(实现类:SingleThreadEventExcutor),此处【坐标1】:
io.netty.util.concurrent.SingleThreadEventExecutor#execute

    @Override
    public void execute(Runnable task) {
        if (task == null) {
            throw new NullPointerException("task");
        }

        boolean inEventLoop = inEventLoop();
        if (inEventLoop) {
            addTask(task);
        } else {
            startThread();
            addTask(task);
            if (isShutdown() && removeTask(task)) {
                reject();
            }
        }

        if (!addTaskWakesUp && wakesUpForTask(task)) {
            wakeup(inEventLoop);
        }
    }
  • inEventLoop判断添加任务操作是否是当前NioEventLoop的线程发起的,如果不是,为了保证线程安全性会进行一些额外操作。
  • 关于addTask方法,后面会再解析。

进入inEventLoop方法看看(实现类:SingleThreadEventExcutor):

    @Override
    public boolean inEventLoop() {
        return inEventLoop(Thread.currentThread());
    }
  • 传入参数为当前线程。

继续进入inEventLoop方法(实现类:SingleThreadEventExcutor):
io.netty.util.concurrent.SingleThreadEventExecutor#inEventLoop

    @Override
    public boolean inEventLoop(Thread thread) {
        return thread == this.thread;
    }
  • 说明inEventLoop()方法最终就是在判断当前线程是否是NioEventLoop的线程(类型为:FastThreadLocalThread)

不记得FastThreadLocalThread的,可以去这篇看看,有简单提到。


视角切回上面【坐标1】的代码,可以看出,如果执行execute进行添加任务的线程是来自当前NioEventLoop线程的,则会直接执行addTask方法添加任务,如果不是,则会先执行startThread方法。

下面就先进入这个startThread方法看看:
io.netty.util.concurrent.SingleThreadEventExecutor#startThread

    private void startThread() {
        if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
            if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
                doStartThread();
            }
        }
    }
  • 第一个if是在判断当前线程是否ST_NOT_STARTED状态,如果是ST_NOT_STARTED状态则继续往下
  • 第二个if是一个CAS操作,将当前线程的ST_NOT_STARTED状态修改为ST_STARTED状态,成功则继续玩下
  • 最后来到一个doStartThread方法

进入doStartThread方法,代码比较长,仅贴出其中比较重要的:
io.netty.util.concurrent.SingleThreadEventExecutor#doStartThread

    private void doStartThread() {
        assert thread == null;
        executor.execute(new Runnable() {
            @Override
            public void run() {
                thread = Thread.currentThread();
                if (interrupted) {
                    thread.interrupt();
                }

                boolean success = false;
                updateLastExecutionTime();
                try {
                    SingleThreadEventExecutor.this.run();
...(略)
  • 这里的executor和它的execute方法就是在前面ThreadPerTaskExcutor中讲到的,忘记可以回去看看。
  • 这段代码保存了当前的线程,即NioEventLoop的线程。
  • 同时,在上面这段代码,也看到了后面要拆开讲的核心代码:SingleThreadEventExecutor.this.run()

对于上面SingleThreadEventExecutor.this.run()的run方法,这篇虽然不会详细讲,但大致看看还是可以的,下面先给出一个大致流程留个印象。

SingThreadEventExecutor.this.run的大致流程:

SingThreadEventExecutor的run方法.png

  • 无限for循环。
  • select方法检测I/O事件
  • 处理select轮询出来的I/O事件
  • 处理异步任务

进入run方法

话不多说,我们进入run方法(实现类:NioEventLoop),由于代码过长,我暂时先贴"三主角":
io.netty.channel.nio.NioEventLoop#run

...
// 检查I/O事件
select(wakenUp.getAndSet(false));
...
// 处理上面select查到的I/O事件
processSelectedKeys();
...
// 运行上面处理的事件集
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
...

上面三个方法就是后面三个小节各自介绍的内容。



小结

本小节的内容不多,只是追了比较简单的代码,但是跟前面学到的知识点有不少联系,希望能把一些之前的一些知识点串联起来再继续前进。