回顾

上一节,简单分析了NioSocketChannel(客户端channel)的创建过程,代码逻辑上跟创建服务端channel差不多。

那么竟然服务端channel有创建、初始化的流程,那么客户端channel创建完后自然也需要初始化,本节就来看看初始化客户端channel的过程,包含分配NioEventLoop、注册selector等操作。

Netty Version:4.1.6


补充

之前博客的遗留问题

在之前【初始化服务端channel】博客中,讲到了一个添加连接器的东西,即如下代码:
io.netty.bootstrap.ServerBootstrap#init

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
...(略) ch.eventLoop().execute(new Runnable() { @Override public void run() { pipeline.addLast(new ServerBootstrapAcceptor( currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)); } }); ...(略)

如果感到比较陌生,建议回去看之前的博客。


现在让我们进入new ServerBootstrapAcceptor构造方法看看,传入的参数之前博客讲过,这里就不再赘述了:

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
ServerBootstrapAcceptor( EventLoopGroup childGroup, ChannelHandler childHandler, Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) { this.childGroup = childGroup; this.childHandler = childHandler; this.childOptions = childOptions; this.childAttrs = childAttrs; }
  • 可见,new ServerBootstrapAcceptor后只是单纯的把传进的参数保存起来而已,那保存起来的东西什么时候用呢,就是在本节初始化服务端channel时用的。

实验代码

Server.java

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.*; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.AttributeKey; /** * @author cwj */ public final class Server { public static void main(String[] args) throws Exception { // 两大线程 // 对应Socket中Server的启动监听线程(负责接收新连接并抛给workerGroup) EventLoopGroup bossGroup = new NioEventLoopGroup(1); // 对应Socket中Client中主函数的线程(负责处理新连接) EventLoopGroup workerGroup = new NioEventLoopGroup(1); try { // 辅助类 ServerBootstrap server = new ServerBootstrap(); // 装配线程 server.group(bossGroup, workerGroup) // 设置Channel的类型 .channel(NioServerSocketChannel.class) // 给每个客户端的连接设置一些tcp的基本属性 // .childOption(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.TCP_NODELAY, true) // 每次创建客户端时,绑定一些基本的属性 .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue") .attr(AttributeKey.newInstance("Attr"), "AttrValue") // 服务端启动时做的逻辑 .handler(new ServerHandler()) // 给pipeline配置handler,Channel发生某种变化时对应的处理逻辑 .childHandler(new ChannelInitializer<SocketChannel>() { @Override public void initChannel(SocketChannel ch) { // todo } }); // 辅助类,绑定端口,这里使用了同步,也就是说它原本是异步的:io.netty.bootstrap.AbstractBootstrap.doBind ChannelFuture future = server.bind(8888).sync(); // 关闭 future.channel().closeFuture().sync(); } finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } } }

ServerHandler.java

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelInboundHandlerAdapter; import java.time.LocalDateTime; import java.util.concurrent.TimeUnit; public class ServerHandler extends ChannelInboundHandlerAdapter { @Override public void channelActive(ChannelHandlerContext ctx) { System.out.println("channelActive"); } @Override public void channelRegistered(ChannelHandlerContext ctx) { System.out.println("channelRegistered"); } @Override public void handlerAdded(ChannelHandlerContext ctx) { System.out.println("handlerAdded"); } @Override public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception { super.channelRead(ctx, msg); new Thread(new Runnable() { @Override public void run() { // 耗时的操作 String result = loadFromDB(); ctx.channel().writeAndFlush(result); ctx.executor().schedule(new Runnable() { @Override public void run() { // ...do something } }, 1, TimeUnit.SECONDS); } }).start(); } private String loadFromDB() { return "hello world!"; } }

跟进源码

一些准备工作

首先,得找到本篇博客的入口io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read,如果忘记的话怎么进来的话,可以回顾检测+获取新连接这篇博客。

然后我们需要在fireChannelRead方法上打个断点,便于跳过前面繁琐流程:

之后,在io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)的channelRead方法上打个断点:


启动Server.java的main方法:

只有使用telnet连接,在下面命令回车瞬间,就能进入第一个断点了:

shell
  • 01
telnet 127.0.0.0.1 8888

开始追踪源码

首先会来到第一个断点:
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read

  • 来到这里之后我会选择放行,如果你有兴趣,也可以从这里慢慢跟到下个断点。

放行第一个断点,马上就能到第二个断点了:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)

到这里就有点麻烦了,因为invokeChannelRead这个方法会被来回调用3次(现在是第一次),直到第三次,handler()返回的才是返回ServerBootstrapAcceptor的实例,下面来跟根:

  • 第一次,即当前的handler()返回HeadContext:
  • 第二次handler()返回的ServerHandler,是在上面Server.java中设置的:
  • 第三次handler()终于返回了ServerBootstrapAcceptor:

至于为什么是这么个调用顺序,就和pipeline的机制有关了,等我以后学到了,会再补充上来的。


第三次返回的ServerBootstrapAcceptor正是我们想要的。于是,从这一次开始跟进channelRead方法:
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
@Override @SuppressWarnings("unchecked") public void channelRead(ChannelHandlerContext ctx, Object msg) { final Channel child = (Channel) msg; child.pipeline().addLast(childHandler); for (Entry<ChannelOption<?>, Object> e: childOptions) { try { if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) { logger.warn("Unknown channel option: " + e); } } catch (Throwable t) { logger.warn("Failed to set a channel option: " + child, t); } } for (Entry<AttributeKey<?>, Object> e: childAttrs) { child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue()); } try { childGroup.register(child).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { forceClose(child, future.cause()); } } }); } catch (Throwable t) { forceClose(child, t); } }
  • 看到上面的代码,我至少是有80%是感到很熟悉的,因为跟【初始服务端channel】
    【注册selector】这两篇博客中遇到的逻辑神似。
  • 基本可以确定,这段代码就是负责初始化客户端channel属性、配置,以及注册selector的了,不信还可以看看断点图,能找到在Server.java中设置的属性:

设置childAttrs、childOptions、childHandler等这些就不一一赘述了,逻辑和【初始服务端channel】几乎是一样的。


提示:下面的代码逻辑其实跟【服务端注册selector】是一样的

这里应该重点关注的是register方法,我们断点继续跟下去,来到如下方法,此处【坐标1】:
io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)

java
  • 01
  • 02
  • 03
  • 04
@Override public ChannelFuture register(Channel channel) { return next().register(channel); }
  • 唉,这里的next方法貌似有点眼熟呢

断点跟进next方法:
io.netty.channel.MultithreadEventLoopGroup#next

java
  • 01
  • 02
  • 03
  • 04
@Override public EventLoop next() { return (EventLoop) super.next(); }

断点继续跟进next方法:
io.netty.util.concurrent.MultithreadEventExecutorGroup#next

java
  • 01
  • 02
  • 03
  • 04
@Override public EventExecutor next() { return chooser.next(); }
  • 唉,这个chooser也很眼熟,难道...

断点继续跟进next方法:
io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTowEventExecutorChooser#next

java
  • 01
  • 02
  • 03
  • 04
@Override public EventExecutor next() { return executors[idx.getAndIncrement() & executors.length - 1]; }

现在视角重新转回【坐标1】,经过以上追踪,我们明白了,next方法其实就是在分配一个NioEventLoop,并且这个NioEventLoop是属于Server.java代码中的workerGroup。


继续跟进register方法:
io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)

java
  • 01
  • 02
  • 03
public ChannelFuture register(Channel channel) { return register(new DefaultChannelPromise(channel, this)); }
  • new DefaultChannelPromise其实就绑定了客户端channel和分配给它的NioEventLoop。

继续跟进register方法:
io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
@Override public ChannelFuture register(final ChannelPromise promise) { ObjectUtil.checkNotNull(promise, "promise"); promise.channel().unsafe().register(this, promise); return promise; }
  • 需要注意的是这里的unsafe返回的是NioSocketChannelUnsafe,即客户端unsafe:
  • 忘记客户端、服务端unsafe的,可以参考前面的小总结

继续跟进register方法:
io.netty.channel.AbstractChannel.AbstractUnsafe#register

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
public final void register(EventLoop eventLoop, final ChannelPromise promise) { if (eventLoop == null) { throw new NullPointerException("eventLoop"); } if (isRegistered()) { promise.setFailure(new IllegalStateException("registered to an event loop already")); return; } if (!isCompatible(eventLoop)) { promise.setFailure( new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName())); return; } AbstractChannel.this.eventLoop = eventLoop; if (eventLoop.inEventLoop()) { register0(promise); } else { try { eventLoop.execute(new Runnable() { @Override public void run() { register0(promise); } }); } catch (Throwable t) { logger.warn( "Force-closing a channel whose registration task was not accepted by an event loop: {}", AbstractChannel.this, t); closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } } }
  • 保存(绑定)的eventLoop。
  • register0方法就是负责注册selector。

继续进入register0方法,此处【坐标2】:
io.netty.channel.AbstractChannel.AbstractUnsafe#register0

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
private void register0(ChannelPromise promise) { try { if (!promise.setUncancellable() || !ensureOpen(promise)) { return; } boolean firstRegistration = neverRegistered; // 注册selector核心方法 doRegister(); neverRegistered = false; registered = true; pipeline.invokeHandlerAddedIfNeeded(); safeSetSuccess(promise); pipeline.fireChannelRegistered(); // 服务端启动注册selector后因为还没绑定端口,所以这里为false。 //但现在是客户端连接接入,很明显服务端已经完成初始化了,所以此处isActive()结果为true。 if (isActive()) { if (firstRegistration) { // 传播事件的核心方法 pipeline.fireChannelActive(); } else if (config().isAutoRead()) { beginRead(); } } } catch (Throwable t) { closeForcibly(); closeFuture.setClosed(); safeSetFailure(promise, t); } }

继续跟进doRegister方法:
io.netty.channel.nio.AbstractNioChannel#doRegister

java
  • 01
  • 02
  • 03
  • 04
  • 05
  • 06
  • 07
  • 08
  • 09
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
@Override protected void doRegister() throws Exception { boolean selected = false; for (;;) { try { selectionKey = javaChannel().register(eventLoop().selector, 0, this); return; } catch (CancelledKeyException e) { if (!selected) { eventLoop().selectNow(); selected = true; } else { throw e; } } } }
  • 这里就是调用jdk底层channel的register完成selector注册。
  • javaChannel()返回的就是上一节new NioSocketChannel时设置的channel。

执行完以上方法后,就完成selector的注册了,并且相关信息也存储到selectionKey中。

本节就先到这里,关于【坐标2】代码中的提到事件传播,留到下一节继续分析。



小结

  • 客户端channel初始化的逻辑其实和服务端channel初始化的逻辑大同小异,甚至复用了很多相同的代码,都是配置arrt、option、handler以及注册selector等。
  • 只是客户端channel初始化时,使用的是服务端channel初始化时保存的ServerBootstrapAcceptor。