前言

这一节主要通过跟踪write方法来看看outbound事件传播的顺序、流程。如果对事件传播节点的双向链表不熟悉,或对事件添加的顺序不熟悉,建议复习下我博客的相关篇章。

Netty Version:4.1.6


实验代码

Server.java

import com.imooc.netty.ch3.ServerHandler;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;

public final class Server {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(1);

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .handler(new ServerHandler())
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new OutBoundHandlerA());
                            ch.pipeline().addLast(new OutBoundHandlerB());
                            ch.pipeline().addLast(new OutBoundHandlerC());
                        }
                    });

            ChannelFuture f = b.bind(8888).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

OutBoundHandlerB.java

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

import java.util.concurrent.TimeUnit;

public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutBoundHandlerB: " + msg);
        ctx.write(msg, promise);
    }


    @Override
    public void handlerAdded(final ChannelHandlerContext ctx) {
        // 定时任务
        ctx.executor().schedule(() -> {
            ctx.channel().write("hello world");
        }, 3, TimeUnit.SECONDS);
    }
}

OutBoundHandlerA.java

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {

    @Override
    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
        System.out.println("OutBoundHandlerA: " + msg);
        ctx.write(msg, promise);
    }
}

OutBoundHandlerCOutBoundHandlerA,最后启动Server.java的main方法。


跟进源码

再跟进之前,你可能会问ctx.executor().schedule()是个啥玩意儿?其实这是Netty提供的添加定时任务的接口,在【runAllTasks】这篇有简单跟踪过,忘记/有兴趣的可以去瞧一瞧。

启动Server.java后,用telnet连接,断点就可以进入到OutBoundHandlerB的handlerAdded方法了,至于怎么进入handlerAdded的,可以参考【添加handler】

现在,就开始追ctx.channel().write的write方法了,先跟进去瞧瞧:
io.netty.channel.AbstractChannel#write(java.lang.Object)

    @Override
    public ChannelFuture write(Object msg) {
        return pipeline.write(msg);
    }
  • 上一节的fireChannelRead是直接通过pipeline调用的,而在这里我是通过channel().write()简介调用的pipeline的,因为这种方式更为推荐、常用,相信用netty写过聊天应用的应该很眼熟种调用方式。

继续跟进write方法:
io.netty.channel.DefaultChannelPipeline#write(java.lang.Object)

    @Override
    public final ChannelFuture write(Object msg) {
        return tail.write(msg);
    }

继续跟进write方法:
io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object)

    @Override
    public ChannelFuture write(Object msg) {
        return write(msg, newPromise());
    }
  • newPromise()方法其实就是在获取当前绑定channel和NioEventLoop。

继续跟进write方法,此处【坐标1】
io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, io.netty.channel.ChannelPromise)

    public ChannelFuture write(final Object msg, final ChannelPromise promise) {
        if (msg == null) {
            throw new NullPointerException("msg");
        }

        try {
            if (!validatePromise(promise, true)) {
                ReferenceCountUtil.release(msg);
                // cancelled
                return promise;
            }
        } catch (RuntimeException e) {
            ReferenceCountUtil.release(msg);
            throw e;
        }
        write(msg, false, promise);

        return promise;
    }
  • 如果看过【上一节】,相信会觉得这个代码结构眼熟。
  • 传进去的false表示不自动flush。

继续跟进write方法:
io.netty.channel.AbstractChannelHandlerContext#write(java.lang.Object, boolean, io.netty.channel.ChannelPromise)

    private void write(Object msg, boolean flush, ChannelPromise promise) {
        AbstractChannelHandlerContext next = findContextOutbound();
        final Object m = pipeline.touch(msg, next);
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            if (flush) {
                next.invokeWriteAndFlush(m, promise);
            } else {
                next.invokeWrite(m, promise);
            }
        } else {
            AbstractWriteTask task;
            if (flush) {
                task = WriteAndFlushTask.newInstance(next, m, promise);
            }  else {
                task = WriteTask.newInstance(next, m, promise);
            }
            safeExecute(executor, task, promise, m);
        }
    }
  • else代码块的逻辑主要是为了保证线程安全。其最终逻辑也是invokeWrite。

先跟进findContextOutbound方法:
io.netty.channel.AbstractChannelHandlerContext#findContextOutbound

    private AbstractChannelHandlerContext findContextOutbound() {
        AbstractChannelHandlerContext ctx = this;
        do {
            ctx = ctx.prev;
        } while (!ctx.outbound);
        return ctx;
    }
  • 取tail的前一个outbound节点返回。可见outbound事件传播顺序就是从tail传到head,恰好和inbound事件相反。

返回后跟进invokeWrite方法:
io.netty.channel.AbstractChannelHandlerContext#invokeWrite

    private void invokeWrite(Object msg, ChannelPromise promise) {
        if (invokeHandler()) {
            invokeWrite0(msg, promise);
        } else {
            write(msg, promise);
        }
    }

跟进invokeWrite0方法:
io.netty.channel.AbstractChannelHandlerContext#invokeWrite0

    private void invokeWrite0(Object msg, ChannelPromise promise) {
        try {
            ((ChannelOutboundHandler) handler()).write(this, msg, promise);
        } catch (Throwable t) {
            notifyOutboundHandlerException(t, promise);
        }
    }
  • 这里handler()的返回值就是OutBoundHandlerC:handler()返回值
  • 所以接下来就会调用OutBoundHandlerC中覆写的write方法。

继续跟进write方法:
回到OutBoundHandlerC


如果继续跟进这个ctx.write,就会回到【坐标1】的代码逻辑,后面会继续取前一节点传播上去,经过OutBoundHandlerB、OutBoundHandlerA的write方法后,最终传到HeadContext的write方法。


因为后续传播只是调用上面讲过的重复代码,下面略过OutBoundHandlerB、OutBoundHandlerA的传播过程,直接来到HeadContext的write方法:
io.netty.channel.DefaultChannelPipeline.HeadContext#write

        @Override
        public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
            unsafe.write(msg, promise);
        }

继续跟进write方法:
io.netty.channel.AbstractChannel.AbstractUnsafe#write

        @Override
        public final void write(Object msg, ChannelPromise promise) {
            assertEventLoop();

            ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
            if (outboundBuffer == null) {
                // If the outboundBuffer is null we know the channel was closed and so
                // need to fail the future right away. If it is not null the handling of the rest
                // will be done in flush0()
                // See https://github.com/netty/netty/issues/2362
                safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
                // release message now to prevent resource-leak
                ReferenceCountUtil.release(msg);
                return;
            }

            int size;
            try {
                msg = filterOutboundMessage(msg);
                size = pipeline.estimatorHandle().size(msg);
                if (size < 0) {
                    size = 0;
                }
            } catch (Throwable t) {
                safeSetFailure(promise, t);
                ReferenceCountUtil.release(msg);
                return;
            }

            outboundBuffer.addMessage(msg, size, promise);
        }
  • 到最后就是做一些收尾性质的工作。

至此,write事件的传播就告一段落了。



小结

  • 本节主要通过write方法的传播来间接了解outbound事件的传播,发现outbound事件的传播是从tail传播到head的,也就是刚好和用户添加handler的顺序相反,个人认为这个顺序还是需要引起注意的。