回顾
在上一节记录了客户端channel(NioSocketChannel)初始化的大致流程。
经过前面服务端channel的学习,我们不难推测,客户端channel初始化之后也是需要进行事件传播的。而这一节就跟进它的事件传播,源码逻辑则大部分复用【端口绑定】这一节的逻辑。
Netty Version:4.1.6
跟进源码
在追踪源码之前,得要先定位起点,起点就是上一节接近文末处的【坐标2】的io.netty.channel.AbstractChannel.AbstractUnsafe#register0
方法。至于启动Server.java、打断点等这里就不再赘述了。
下面再贴一次代码:
private void register0(ChannelPromise promise) {
try {
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
boolean firstRegistration = neverRegistered;
// 注册selector核心方法
doRegister();
neverRegistered = false;
registered = true;
pipeline.invokeHandlerAddedIfNeeded();
safeSetSuccess(promise);
pipeline.fireChannelRegistered();
// 服务端启动注册selector后因为还没绑定端口,所以这里为false。
//但现在是客户端连接接入,很明显服务端已经完成初始化了,所以此处isActive()结果为true。
if (isActive()) {
if (firstRegistration) {
// 传播事件的核心方法
pipeline.fireChannelActive();
} else if (config().isAutoRead()) {
beginRead();
}
}
} catch (Throwable t) {
closeForcibly();
closeFuture.setClosed();
safeSetFailure(promise, t);
}
}
关于fireChannelActive方法,其实在【端口绑定】这一节也追踪过,这里就当复习一下吧。想看结尾就跳【坐标2】。
这里就从fireChannelActive方法开始追踪,现在进入fireChannelActive方法(也可自己打断点跟进):
io.netty.channel.DefaultChannelPipeline#fireChannelActive
@Override
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
继续跟进invokeChannelActive方法:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelActive(io.netty.channel.AbstractChannelHandlerContext)
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
继续跟进invokeChannelActive方法:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelActive()
private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}
继续跟进channelActive方法:
io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
- 这里的fireChannelActive最终会执行到上一节中
ServerHandler.java
的channelActive方法。
继续追readIfIsAutoRead方法:
io.netty.channel.DefaultChannelPipeline.HeadContext#readIfIsAutoRead
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
继续进入read方法:
io.netty.channel.AbstractChannel#read
@Override
public Channel read() {
pipeline.read();
return this;
}
继续跟进read方法:
io.netty.channel.DefaultChannelPipeline#read
@Override
public final ChannelPipeline read() {
tail.read();
return this;
}
继续跟进read方法:
io.netty.channel.AbstractChannelHandlerContext#read
@Override
public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Runnable task = next.invokeReadTask;
if (task == null) {
next.invokeReadTask = task = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
}
executor.execute(task);
}
return this;
}
继续跟进invokeRead方法:
io.netty.channel.AbstractChannelHandlerContext#invokeRead
private void invokeRead() {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
read();
}
}
继续跟进read方法:
io.netty.channel.DefaultChannelPipeline.HeadContext#read
@Override
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
继续跟进这个beginRead方法:
io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead
@Override
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
继续进入doBeginRead方法,终于来到熟悉的老地方了,此处【坐标2】:
io.netty.channel.nio.AbstractNioChannel#doBeginRead
@Override
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
到这一步,和服务端channel事件传播的区别就是:当前客户端channel传播的是OP_READ事件,不信可以来看看下面的断点图:
readInterestOp=1,即=OP_READ,这个值是在【创建客户端channel】这篇博客中设置的。
最终,当改变了ops的值后,就完成了read事件注册,NioEventLoop中run方法的processSelectedKeys方法就会开始关注OP_READ事件,详细可参考【processSelectedKeys执行逻辑】这一篇。
因为代码逻辑和前面【端口绑定】,这里就不需要小结了。