【Netty】处理新连接(三):初始化客户端channel(NioSocketChannel)

【Netty】处理新连接(三):初始化客户端channel(NioSocketChannel)

Scroll Down

回顾

上一节,简单分析了NioSocketChannel(客户端channel)的创建过程,代码逻辑上跟创建服务端channel差不多。

那么竟然服务端channel有创建、初始化的流程,那么客户端channel创建完后自然也需要初始化,本节就来看看初始化客户端channel的过程,包含分配NioEventLoop、注册selector等操作。

Netty Version:4.1.6


补充

之前博客的遗留问题

在之前【初始化服务端channel】博客中,讲到了一个添加连接器的东西,即如下代码:
io.netty.bootstrap.ServerBootstrap#init

...(略)
    ch.eventLoop().execute(new Runnable() {
        @Override
        public void run() {
        pipeline.addLast(new ServerBootstrapAcceptor(
		currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
		}
	});
...(略)

如果感到比较陌生,建议回去看之前的博客。


现在让我们进入new ServerBootstrapAcceptor构造方法看看,传入的参数之前博客讲过,这里就不再赘述了:

        ServerBootstrapAcceptor(
                EventLoopGroup childGroup, ChannelHandler childHandler,
                Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
            this.childGroup = childGroup;
            this.childHandler = childHandler;
            this.childOptions = childOptions;
            this.childAttrs = childAttrs;
        }
  • 可见,new ServerBootstrapAcceptor后只是单纯的把传进的参数保存起来而已,那保存起来的东西什么时候用呢,就是在本节初始化服务端channel时用的。

实验代码

Server.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;

/**
 * @author cwj
 */
public final class Server {

    public static void main(String[] args) throws Exception {
        // 两大线程
        // 对应Socket中Server的启动监听线程(负责接收新连接并抛给workerGroup)
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        // 对应Socket中Client中主函数的线程(负责处理新连接)
        EventLoopGroup workerGroup = new NioEventLoopGroup(1);

        try {
            // 辅助类
            ServerBootstrap server = new ServerBootstrap();
            // 装配线程
            server.group(bossGroup, workerGroup)
                    // 设置Channel的类型
                    .channel(NioServerSocketChannel.class)
                    // 给每个客户端的连接设置一些tcp的基本属性
//                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .option(ChannelOption.TCP_NODELAY, true)
                    // 每次创建客户端时,绑定一些基本的属性
                    .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
                    .attr(AttributeKey.newInstance("Attr"), "AttrValue")
                    // 服务端启动时做的逻辑
                    .handler(new ServerHandler())
                    // 给pipeline配置handler,Channel发生某种变化时对应的处理逻辑
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            // todo
                        }
                    });

            // 辅助类,绑定端口,这里使用了同步,也就是说它原本是异步的:io.netty.bootstrap.AbstractBootstrap.doBind
            ChannelFuture future = server.bind(8888).sync();
            // 关闭
            future.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

ServerHandler.java

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;

public class ServerHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        System.out.println("channelActive");
    }

    @Override
    public void channelRegistered(ChannelHandlerContext ctx) {
        System.out.println("channelRegistered");
    }

    @Override
    public void handlerAdded(ChannelHandlerContext ctx) {
        System.out.println("handlerAdded");
    }

    @Override
    public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
        super.channelRead(ctx, msg);

        new Thread(new Runnable() {
            @Override
            public void run() {
                // 耗时的操作
                String result = loadFromDB();

                ctx.channel().writeAndFlush(result);
                ctx.executor().schedule(new Runnable() {
                    @Override
                    public void run() {
                        // ...do something
                    }
                }, 1, TimeUnit.SECONDS);

            }
        }).start();
    }

    private String loadFromDB() {
        return "hello world!";
    }
}

跟进源码

一些准备工作

首先,得找到本篇博客的入口io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read,如果忘记的话怎么进来的话,可以回顾检测+获取新连接这篇博客。

然后我们需要在fireChannelRead方法上打个断点,便于跳过前面繁琐流程:
image.png

之后,在io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)的channelRead方法上打个断点:
image.png


启动Server.java的main方法:
image.png

只有使用telnet连接,在下面命令回车瞬间,就能进入第一个断点了:

telnet 127.0.0.0.1 8888

开始追踪源码

首先会来到第一个断点:
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
image.png

  • 来到这里之后我会选择放行,如果你有兴趣,也可以从这里慢慢跟到下个断点。

放行第一个断点,马上就能到第二个断点了:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)
image.png

到这里就有点麻烦了,因为invokeChannelRead这个方法会被来回调用3次(现在是第一次),直到第三次,handler()返回的才是返回ServerBootstrapAcceptor的实例,下面来跟根:

  • 第一次,即当前的handler()返回HeadContext:image.png
  • 第二次handler()返回的ServerHandler,是在上面Server.java中设置的:image.png
  • 第三次handler()终于返回了ServerBootstrapAcceptor:image.png

至于为什么是这么个调用顺序,就和pipeline的机制有关了,等我以后学到了,会再补充上来的。


第三次返回的ServerBootstrapAcceptor正是我们想要的。于是,从这一次开始跟进channelRead方法:
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

        @Override
        @SuppressWarnings("unchecked")
        public void channelRead(ChannelHandlerContext ctx, Object msg) {
            final Channel child = (Channel) msg;

            child.pipeline().addLast(childHandler);

            for (Entry<ChannelOption<?>, Object> e: childOptions) {
                try {
                    if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
                        logger.warn("Unknown channel option: " + e);
                    }
                } catch (Throwable t) {
                    logger.warn("Failed to set a channel option: " + child, t);
                }
            }

            for (Entry<AttributeKey<?>, Object> e: childAttrs) {
                child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
            }

            try {
                childGroup.register(child).addListener(new ChannelFutureListener() {
                    @Override
                    public void operationComplete(ChannelFuture future) throws Exception {
                        if (!future.isSuccess()) {
                            forceClose(child, future.cause());
                        }
                    }
                });
            } catch (Throwable t) {
                forceClose(child, t);
            }
        }
  • 看到上面的代码,我至少是有80%是感到很熟悉的,因为跟【初始服务端channel】
    【注册selector】这两篇博客中遇到的逻辑神似。
  • 基本可以确定,这段代码就是负责初始化客户端channel属性、配置,以及注册selector的了,不信还可以看看断点图,能找到在Server.java中设置的属性:image.png

设置childAttrs、childOptions、childHandler等这些就不一一赘述了,逻辑和【初始服务端channel】几乎是一样的。


提示:下面的代码逻辑其实跟【服务端注册selector】是一样的

这里应该重点关注的是register方法,我们断点继续跟下去,来到如下方法,此处【坐标1】:
io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)

    @Override
    public ChannelFuture register(Channel channel) {
        return next().register(channel);
    }
  • 唉,这里的next方法貌似有点眼熟呢

断点跟进next方法:
io.netty.channel.MultithreadEventLoopGroup#next

    @Override
    public EventLoop next() {
        return (EventLoop) super.next();
    }

断点继续跟进next方法:
io.netty.util.concurrent.MultithreadEventExecutorGroup#next

    @Override
    public EventExecutor next() {
        return chooser.next();
    }
  • 唉,这个chooser也很眼熟,难道...

断点继续跟进next方法:
io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTowEventExecutorChooser#next

        @Override
        public EventExecutor next() {
            return executors[idx.getAndIncrement() & executors.length - 1];
        }

现在视角重新转回【坐标1】,经过以上追踪,我们明白了,next方法其实就是在分配一个NioEventLoop,并且这个NioEventLoop是属于Server.java代码中的workerGroup。


继续跟进register方法:
io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)

    public ChannelFuture register(Channel channel) {
        return register(new DefaultChannelPromise(channel, this));
    }
  • new DefaultChannelPromise其实就绑定了客户端channel和分配给它的NioEventLoop。

继续跟进register方法:
io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)

    @Override
    public ChannelFuture register(final ChannelPromise promise) {
        ObjectUtil.checkNotNull(promise, "promise");
        promise.channel().unsafe().register(this, promise);
        return promise;
    }
  • 需要注意的是这里的unsafe返回的是NioSocketChannelUnsafe,即客户端unsafe:image.png
  • 忘记客户端、服务端unsafe的,可以参考前面的小总结

继续跟进register方法:
io.netty.channel.AbstractChannel.AbstractUnsafe#register

        public final void register(EventLoop eventLoop, final ChannelPromise promise) {
            if (eventLoop == null) {
                throw new NullPointerException("eventLoop");
            }
            if (isRegistered()) {
                promise.setFailure(new IllegalStateException("registered to an event loop already"));
                return;
            }
            if (!isCompatible(eventLoop)) {
                promise.setFailure(
                        new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
                return;
            }

            AbstractChannel.this.eventLoop = eventLoop;

            if (eventLoop.inEventLoop()) {
                register0(promise);
            } else {
                try {
                    eventLoop.execute(new Runnable() {
                        @Override
                        public void run() {
                            register0(promise);
                        }
                    });
                } catch (Throwable t) {
                    logger.warn(
                            "Force-closing a channel whose registration task was not accepted by an event loop: {}",
                            AbstractChannel.this, t);
                    closeForcibly();
                    closeFuture.setClosed();
                    safeSetFailure(promise, t);
                }
            }
        }
  • 保存(绑定)的eventLoop。
  • register0方法就是负责注册selector。

继续进入register0方法,此处【坐标2】:
io.netty.channel.AbstractChannel.AbstractUnsafe#register0

        private void register0(ChannelPromise promise) {
            try {
                if (!promise.setUncancellable() || !ensureOpen(promise)) {
                    return;
                }
                boolean firstRegistration = neverRegistered;
                // 注册selector核心方法
                doRegister();
                neverRegistered = false;
                registered = true;

                pipeline.invokeHandlerAddedIfNeeded();

                safeSetSuccess(promise);
                pipeline.fireChannelRegistered();
                // 服务端启动注册selector后因为还没绑定端口,所以这里为false。
                //但现在是客户端连接接入,很明显服务端已经完成初始化了,所以此处isActive()结果为true。
                if (isActive()) {
                    if (firstRegistration) {
                        // 传播事件的核心方法
                        pipeline.fireChannelActive();
                    } else if (config().isAutoRead()) {
                        beginRead();
                    }
                }
            } catch (Throwable t) {
                closeForcibly();
                closeFuture.setClosed();
                safeSetFailure(promise, t);
            }
        }

继续跟进doRegister方法:
io.netty.channel.nio.AbstractNioChannel#doRegister

    @Override
    protected void doRegister() throws Exception {
        boolean selected = false;
        for (;;) {
            try {
                selectionKey = javaChannel().register(eventLoop().selector, 0, this);
                return;
            } catch (CancelledKeyException e) {
                if (!selected) {
                    eventLoop().selectNow();
                    selected = true;
                } else {
                    throw e;
                }
            }
        }
    }
  • 这里就是调用jdk底层channel的register完成selector注册。
  • javaChannel()返回的就是上一节new NioSocketChannel时设置的channel。

执行完以上方法后,就完成selector的注册了,并且相关信息也存储到selectionKey中。

本节就先到这里,关于【坐标2】代码中的提到事件传播,留到下一节继续分析。



小结

  • 客户端channel初始化的逻辑其实和服务端channel初始化的逻辑大同小异,甚至复用了很多相同的代码,都是配置arrt、option、handler以及注册selector等。
  • 只是客户端channel初始化时,使用的是服务端channel初始化时保存的ServerBootstrapAcceptor。