回顾
- 创建服务端Channel
- 初始化服务端Channel
- 注册selector(<--上一节)
- 端口绑定、通知(<--本节)
Netty version:4.1.6
前面三节中Netty已经完成创建、初始化Channel以及注册selector,那么它只差最后一步就能开始和客户端建立连接了,即本节的:端口绑定。
端口绑定
大致函数调用流程:
确认起点
本节起点位置:io.netty.bootstrap.AbstractBootstrap#doBind
如果不知道怎么进来的,可以去创建服务端Channel那一节重新看看,这里不再赘述。
开始源码追踪
现在,视角重新回到io.netty.bootstrap.AbstractBootstrap#doBind
,找到doBind0方法:
进入doBind0方法:
io.netty.bootstrap.AbstractBootstrap#doBind0
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {
// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}
然后再进入bind方法(实现选择AbstractChannel):
io.netty.channel.AbstractChannel#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
public ChannelFuture bind(SocketAddress localAddress, ChannelPromise promise) {
return pipeline.bind(localAddress, promise);
}
之后一路bind(源码略):
- 进入
io.netty.channel.DefaultChannelPipeline#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
- 进入
io.netty.channel.AbstractChannelHandlerContext#bind(java.net.SocketAddress, io.netty.channel.ChannelPromise)
- 进入
io.netty.channel.AbstractChannelHandlerContext#invokeBind
- 进入
io.netty.channel.DefaultChannelPipeline.HeadContext#bind
- 进入
io.netty.channel.AbstractChannel.AbstractUnsafe#bind
如果对源码怎么跳的不感兴趣,那就直接将视角切换到:io.netty.channel.AbstractChannel.AbstractUnsafe#bind
,接下来都是围绕这里:
public final void bind(final SocketAddress localAddress, final ChannelPromise promise) {
assertEventLoop();
if (!promise.setUncancellable() || !ensureOpen(promise)) {
return;
}
// See: https://github.com/netty/netty/issues/576
if (Boolean.TRUE.equals(config().getOption(ChannelOption.SO_BROADCAST)) &&
localAddress instanceof InetSocketAddress &&
!((InetSocketAddress) localAddress).getAddress().isAnyLocalAddress() &&
!PlatformDependent.isWindows() && !PlatformDependent.isRoot()) {
logger.warn(
"A non-root user can't receive a broadcast packet if the socket " +
"is not bound to a wildcard address; binding to a non-wildcard " +
"address (" + localAddress + ") anyway as requested.");
}
boolean wasActive = isActive();
try {
doBind(localAddress);
} catch (Throwable t) {
safeSetFailure(promise, t);
closeIfClosed();
return;
}
if (!wasActive && isActive()) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireChannelActive();
}
});
}
safeSetSuccess(promise);
}
我们先进入上面doBind方法(实现类:NioServerSocketChannel):
io.netty.channel.socket.nio.NioSocketChannel#doBind
protected void doBind(SocketAddress localAddress) throws Exception {
if (PlatformDependent.javaVersion() >= 7) {
javaChannel().bind(localAddress, config.getBacklog());
} else {
javaChannel().socket().bind(localAddress, config.getBacklog());
}
}
上面的代码底层其实就是使用jdk的api绑定端口,我们可以再来看看它的参数大概是个什么东西:
可以看到,就是我们自定义的一些参数,包括ip、端口等,当这里的代码执行完后,就相当于绑定完成了,接下来就是要传播下事件,告诉Netty可以监听accept事件了。
然后,视角重新回到io.netty.channel.AbstractChannel.AbstractUnsafe#bind
,找到下面这个方法:
图中的fireChannelActive方法其实就相当于一个通知,上一节也有讲到它对应的就是channelActive事件,我们在这里继续追一下源码:
io.netty.channel.DefaultChannelPipeline#fireChannelActive
public final ChannelPipeline fireChannelActive() {
AbstractChannelHandlerContext.invokeChannelActive(head);
return this;
}
之后再进入invokeChannelActive方法:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelActive(io.netty.channel.AbstractChannelHandlerContext)
static void invokeChannelActive(final AbstractChannelHandlerContext next) {
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeChannelActive();
} else {
executor.execute(new Runnable() {
@Override
public void run() {
next.invokeChannelActive();
}
});
}
}
之后再进入invokeChannelActive函数:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelActive()
private void invokeChannelActive() {
if (invokeHandler()) {
try {
((ChannelInboundHandler) handler()).channelActive(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
fireChannelActive();
}
}
之后再进入channelActive函数(实现类:DefaultChannelHandlerContext):
io.netty.channel.DefaultChannelPipeline.HeadContext#channelActive
public void channelActive(ChannelHandlerContext ctx) throws Exception {
ctx.fireChannelActive();
readIfIsAutoRead();
}
上面代码中的ctx.fireChannelActive();,最终会去到上一节ServerHandler.java
中的channelActive方法。
我们再来关注下上面代码中的readIfIsAutoRead方法,追一下它:
io.netty.channel.DefaultChannelPipeline.HeadContext#readIfIsAutoRead
private void readIfIsAutoRead() {
if (channel.config().isAutoRead()) {
channel.read();
}
}
进入上面代码中的read方法,之后也是一路入read方法(中间过程略),最终来到下面这个read方法:
io.netty.channel.AbstractChannelHandlerContext#read
public ChannelHandlerContext read() {
final AbstractChannelHandlerContext next = findContextOutbound();
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
next.invokeRead();
} else {
Runnable task = next.invokeReadTask;
if (task == null) {
next.invokeReadTask = task = new Runnable() {
@Override
public void run() {
next.invokeRead();
}
};
}
executor.execute(task);
}
return this;
}
然后进入invokeRead方法:
io.netty.channel.AbstractChannelHandlerContext#invokeRead
private void invokeRead() {
if (invokeHandler()) {
try {
((ChannelOutboundHandler) handler()).read(this);
} catch (Throwable t) {
notifyHandlerException(t);
}
} else {
read();
}
}
然后再进入((ChannelOutboundHandler) handler()).read(this);的read方法:
io.netty.channel.DefaultChannelPipeline.HeadContext#read
public void read(ChannelHandlerContext ctx) {
unsafe.beginRead();
}
之后再进入beginRead方法:
io.netty.channel.AbstractChannel.AbstractUnsafe#beginRead
public final void beginRead() {
assertEventLoop();
if (!isActive()) {
return;
}
try {
doBeginRead();
} catch (final Exception e) {
invokeLater(new Runnable() {
@Override
public void run() {
pipeline.fireExceptionCaught(e);
}
});
close(voidPromise());
}
}
之后再进入doBeginRead方法(实现类AbstractNioChannel):
io.netty.channel.nio.AbstractNioChannel#doBeginRead
protected void doBeginRead() throws Exception {
// Channel.read() or ChannelHandlerContext.read() was called
final SelectionKey selectionKey = this.selectionKey;
if (!selectionKey.isValid()) {
return;
}
readPending = true;
final int interestOps = selectionKey.interestOps();
if ((interestOps & readInterestOp) == 0) {
selectionKey.interestOps(interestOps | readInterestOp);
}
}
先解释下上面代码的核心含义:
- 如果先前的设置的ops不等于现在的readInterestOp,那么现在就会将原本的ops修改为readInterestOp的值,readInterestOp表示Netty要开始关注accept事件了。
下面再来讲解下这里面的一些参数:
上面代码其实就是事件传播的核心,而这里的selectionKey看着很眼熟,其它就是上一节【注册selector】中的某个步骤中设置的,忘记的话可以回去看看。
除此之外,readInterestOp这个参数也很眼熟,还记得同样是在上一节【注册selector】中,我们也设置了一个ops=0,表示不需要关心任何事件,而这里的interestOps是从selectionKey中取的,我们不难推测+验证,这个interestOps就是我们之前设置的ops,即0。
上面的结论想要验证的话,可以自行打下断点,因为比较简单,我就不浪费流量贴图了。
不仅如此,其实readInterestOp也是在前面就设置好的,这里就需要追踪下了readInterestOp是在哪里初始化的了:
io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}
throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}
可以看见就是在当前类的构造函数被初始化的,那么这个构造函数是谁调用的呢?其实就在第一节【创建服务端Channel】中创建的,这里贴下代码:
io.netty.channel.socket.nio.NioServerSocketChannel#NioServerSocketChannel(java.nio.channels.ServerSocketChannel)
public NioServerSocketChannel(ServerSocketChannel channel) {
super(null, channel, SelectionKey.OP_ACCEPT);
config = new NioServerSocketChannelConfig(this, javaChannel().socket());
}
进入super方法,最终就会去到io.netty.channel.nio.AbstractNioChannel#AbstractNioChannel
,初始化readInterestOp。
而且我们能看到readInterestOp参数是SelectionKey.OP_ACCEPT,当上面讲到的io.netty.channel.nio.AbstractNioChannel#doBeginRead
执行了selectionKey.interestOps(interestOps | readInterestOp);
后,其实就等于告诉Netty可以关注accept事件了,和socket编程非常相似。
小结
这节除了讲到端口绑定外,还讲到了端口绑定后的事件传播,主要围绕两个方法,下面截了张图:
io.netty.channel.AbstractChannel.AbstractUnsafe#bind