回顾
在上一节,简单分析了NioSocketChannel(客户端channel)的创建过程,代码逻辑上跟创建服务端channel差不多。
那么竟然服务端channel有创建、初始化的流程,那么客户端channel创建完后自然也需要初始化,本节就来看看初始化客户端channel的过程,包含分配NioEventLoop、注册selector等操作。
Netty Version:4.1.6
补充
之前博客的遗留问题
在之前【初始化服务端channel】博客中,讲到了一个添加连接器的东西,即如下代码:
io.netty.bootstrap.ServerBootstrap#init
...(略)
ch.eventLoop().execute(new Runnable() {
@Override
public void run() {
pipeline.addLast(new ServerBootstrapAcceptor(
currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs));
}
});
...(略)
如果感到比较陌生,建议回去看之前的博客。
现在让我们进入new ServerBootstrapAcceptor构造方法看看,传入的参数之前博客讲过,这里就不再赘述了:
ServerBootstrapAcceptor(
EventLoopGroup childGroup, ChannelHandler childHandler,
Entry<ChannelOption<?>, Object>[] childOptions, Entry<AttributeKey<?>, Object>[] childAttrs) {
this.childGroup = childGroup;
this.childHandler = childHandler;
this.childOptions = childOptions;
this.childAttrs = childAttrs;
}
- 可见,new ServerBootstrapAcceptor后只是单纯的把传进的参数保存起来而已,那保存起来的东西什么时候用呢,就是在本节初始化服务端channel时用的。
实验代码
Server.java
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;
/**
* @author cwj
*/
public final class Server {
public static void main(String[] args) throws Exception {
// 两大线程
// 对应Socket中Server的启动监听线程(负责接收新连接并抛给workerGroup)
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
// 对应Socket中Client中主函数的线程(负责处理新连接)
EventLoopGroup workerGroup = new NioEventLoopGroup(1);
try {
// 辅助类
ServerBootstrap server = new ServerBootstrap();
// 装配线程
server.group(bossGroup, workerGroup)
// 设置Channel的类型
.channel(NioServerSocketChannel.class)
// 给每个客户端的连接设置一些tcp的基本属性
// .childOption(ChannelOption.TCP_NODELAY, true)
.option(ChannelOption.TCP_NODELAY, true)
// 每次创建客户端时,绑定一些基本的属性
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.attr(AttributeKey.newInstance("Attr"), "AttrValue")
// 服务端启动时做的逻辑
.handler(new ServerHandler())
// 给pipeline配置handler,Channel发生某种变化时对应的处理逻辑
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
// todo
}
});
// 辅助类,绑定端口,这里使用了同步,也就是说它原本是异步的:io.netty.bootstrap.AbstractBootstrap.doBind
ChannelFuture future = server.bind(8888).sync();
// 关闭
future.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
ServerHandler.java
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import java.time.LocalDateTime;
import java.util.concurrent.TimeUnit;
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelActive(ChannelHandlerContext ctx) {
System.out.println("channelActive");
}
@Override
public void channelRegistered(ChannelHandlerContext ctx) {
System.out.println("channelRegistered");
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) {
System.out.println("handlerAdded");
}
@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) throws Exception {
super.channelRead(ctx, msg);
new Thread(new Runnable() {
@Override
public void run() {
// 耗时的操作
String result = loadFromDB();
ctx.channel().writeAndFlush(result);
ctx.executor().schedule(new Runnable() {
@Override
public void run() {
// ...do something
}
}, 1, TimeUnit.SECONDS);
}
}).start();
}
private String loadFromDB() {
return "hello world!";
}
}
跟进源码
一些准备工作
首先,得找到本篇博客的入口io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
,如果忘记的话怎么进来的话,可以回顾检测+获取新连接这篇博客。
然后我们需要在fireChannelRead方法上打个断点,便于跳过前面繁琐流程:
之后,在io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)
的channelRead方法上打个断点:
启动Server.java的main方法:
只有使用telnet连接,在下面命令回车瞬间,就能进入第一个断点了:
telnet 127.0.0.0.1 8888
开始追踪源码
首先会来到第一个断点:
io.netty.channel.nio.AbstractNioMessageChannel.NioMessageUnsafe#read
- 来到这里之后我会选择放行,如果你有兴趣,也可以从这里慢慢跟到下个断点。
放行第一个断点,马上就能到第二个断点了:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)
到这里就有点麻烦了,因为invokeChannelRead这个方法会被来回调用3次(现在是第一次),直到第三次,handler()返回的才是返回ServerBootstrapAcceptor的实例,下面来跟根:
- 第一次,即当前的handler()返回HeadContext:
- 第二次handler()返回的ServerHandler,是在上面
Server.java
中设置的: - 第三次handler()终于返回了ServerBootstrapAcceptor:
至于为什么是这么个调用顺序,就和pipeline的机制有关了,等我以后学到了,会再补充上来的。
第三次返回的ServerBootstrapAcceptor正是我们想要的。于是,从这一次开始跟进channelRead方法:
io.netty.bootstrap.ServerBootstrap.ServerBootstrapAcceptor#channelRead
@Override
@SuppressWarnings("unchecked")
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;
child.pipeline().addLast(childHandler);
for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}
for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}
try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}
- 看到上面的代码,我至少是有80%是感到很熟悉的,因为跟【初始服务端channel】
和【注册selector】这两篇博客中遇到的逻辑神似。 - 基本可以确定,这段代码就是负责初始化客户端channel属性、配置,以及注册selector的了,不信还可以看看断点图,能找到在Server.java中设置的属性:
设置childAttrs、childOptions、childHandler等这些就不一一赘述了,逻辑和【初始服务端channel】几乎是一样的。
提示:下面的代码逻辑其实跟【服务端注册selector】是一样的
这里应该重点关注的是register方法,我们断点继续跟下去,来到如下方法,此处【坐标1】:
io.netty.channel.MultithreadEventLoopGroup#register(io.netty.channel.Channel)
@Override
public ChannelFuture register(Channel channel) {
return next().register(channel);
}
- 唉,这里的next方法貌似有点眼熟呢
断点跟进next方法:
io.netty.channel.MultithreadEventLoopGroup#next
@Override
public EventLoop next() {
return (EventLoop) super.next();
}
断点继续跟进next方法:
io.netty.util.concurrent.MultithreadEventExecutorGroup#next
@Override
public EventExecutor next() {
return chooser.next();
}
- 唉,这个chooser也很眼熟,难道...
断点继续跟进next方法:
io.netty.util.concurrent.DefaultEventExecutorChooserFactory.PowerOfTowEventExecutorChooser#next
@Override
public EventExecutor next() {
return executors[idx.getAndIncrement() & executors.length - 1];
}
- 呼,豁然开朗了,这不就是前面创建chooser博客中的选择器吗。
现在视角重新转回【坐标1】,经过以上追踪,我们明白了,next方法其实就是在分配一个NioEventLoop,并且这个NioEventLoop是属于Server.java
代码中的workerGroup。
继续跟进register方法:
io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.Channel)
public ChannelFuture register(Channel channel) {
return register(new DefaultChannelPromise(channel, this));
}
- new DefaultChannelPromise其实就绑定了客户端channel和分配给它的NioEventLoop。
继续跟进register方法:
io.netty.channel.SingleThreadEventLoop#register(io.netty.channel.ChannelPromise)
@Override
public ChannelFuture register(final ChannelPromise promise) {
ObjectUtil.checkNotNull(promise, "promise");
promise.channel().unsafe().register(this, promise);
return promise;
}
- 需要注意的是这里的unsafe返回的是NioSocketChannelUnsafe,即客户端unsafe:
- 忘记客户端、服务端unsafe的,可以参考前面的小总结。
继续跟进register方法:
io.netty.channel.AbstractChannel.AbstractUnsafe#register
public final void register(EventLoop eventLoop, final ChannelPromise promise) {
if (eventLoop == null) {
throw new NullPointerException("eventLoop");
}
if (isRegistered()) {
promise.setFailure(new IllegalStateException("registered to an event loop already"));
return;
}
if (!isCompatible(eventLoop)) {
promise.setFailure(
new IllegalStateException("incompatible event loop type: " + eventLoop.getClass().getName()));
return;
}
AbstractChannel.this.eventLoop = eventLoop;
if (eventLoop.inEventLoop()) {
register0(promise);
} else {
try {
eventLoop.execute(new Runnable() {
@Override
public void run() {
register0(promise);
}
});
} catch (Throwable t) {
logger.warn(
"Force-closing a channel whose registration task was not accepted by an event loop: {}",
AbstractChannel.this, t);
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
}
- 保存(绑定)的eventLoop。
- register0方法就是负责注册selector。
继续进入register0方法,此处【坐标2】:
io.netty.channel.AbstractChannel.AbstractUnsafe#register0
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 注册selector核心方法
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// 服务端启动注册selector后因为还没绑定端口,所以这里为false。
//但现在是客户端连接接入,很明显服务端已经完成初始化了,所以此处isActive()结果为true。
if (isActive()) {
if (firstRegistration) {
// 传播事件的核心方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
继续跟进doRegister方法:
io.netty.channel.nio.AbstractNioChannel#doRegister
@Override
protected void doRegister() throws Exception {
boolean selected = false;
for (;;) {
try {
selectionKey = javaChannel().register(eventLoop().selector, 0, this);
return;
} catch (CancelledKeyException e) {
if (!selected) {
eventLoop().selectNow();
selected = true;
} else {
throw e;
}
}
}
}
- 这里就是调用jdk底层channel的register完成selector注册。
- javaChannel()返回的就是上一节new NioSocketChannel时设置的channel。
执行完以上方法后,就完成selector的注册了,并且相关信息也存储到selectionKey中。
本节就先到这里,关于【坐标2】代码中的提到事件传播,留到下一节继续分析。
小结
- 客户端channel初始化的逻辑其实和服务端channel初始化的逻辑大同小异,甚至复用了很多相同的代码,都是配置arrt、option、handler以及注册selector等。
- 只是客户端channel初始化时,使用的是服务端channel初始化时保存的ServerBootstrapAcceptor。