【Netty】Pipeline相关(七):异常传播

【Netty】Pipeline相关(七):异常传播

Scroll Down

前言

本节主要来看一下Netty处理某事件时如果出现了异常,异常的传播流程(链)是怎么样的。同时也学习一下Netty处理异常的凤毛麟角,为日后能写出优质代码打下基础。

同样,在看本节之前,需要对handler的添加顺序、双向链表结构有一定的了解,可以参考前面的博客:【pipeline初始化】【pipeline添加handler】。如果有前两节inbound、outbound事件传播的跟踪源码基础,那这节内容就显得相当简单了~

Netty Version:4.1.6


实验代码

Server.java

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AttributeKey;

public final class Server {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup(1);

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new InBoundHandlerA());
                            ch.pipeline().addLast(new InBoundHandlerB());
                            ch.pipeline().addLast(new OutBoundHandlerA());
                            ch.pipeline().addLast(new InBoundHandlerC());
                            ch.pipeline().addLast(new OutBoundHandlerB());
                            ch.pipeline().addLast(new OutBoundHandlerC());
                        }
                    });

            ChannelFuture f = b.bind(8888).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

InBoundHandlerA.javaInBoundHandlerC.java

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class InBoundHandlerA extends ChannelInboundHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("InBoundHandlerA.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}

InBoundHandlerB.java

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;


public class OutBoundHandlerB extends ChannelOutboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("OutBoundHandlerB.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}

OutBoundHandlerA.java

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;

public class OutBoundHandlerA extends ChannelOutboundHandlerAdapter {

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        System.out.println("OutBoundHandlerA.exceptionCaught()");
        ctx.fireExceptionCaught(cause);
    }
}

<code>OutBoundHandlerB、C同A</code>

运行结果

启动Server.java的main方法,然后再telnet连接+发送数据,就能看到控制台有如下输出:

控制台输出.png

  • 可见异常是由InBoundHandlerB传出来的,并且传播顺序就是按照我们代码添加handler的顺序。

在知道结果后,我们再来追一下源码。


跟进源码

视角先转回InBoundHandlerB的channelRead方法,我们就从这里开始追起(断点、telnet之类的操作就不再赘述了):

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        throw new BusinessException("from InBoundHandlerB");
    }
  • 如果忘记是如何回调到channelRead上的,可以看这一篇

当这里抛出异常后,就会被上一层invokeChannelRead方法catch:
io.netty.channel.AbstractChannelHandlerContext#invokeChannelRead(java.lang.Object)

    private void invokeChannelRead(Object msg) {
        if (invokeHandler()) {
            try {
                ((ChannelInboundHandler) handler()).channelRead(this, msg);
            } catch (Throwable t) {
                // 来到这里
                notifyHandlerException(t);
            }
        } else {
            fireChannelRead(msg);
        }
    }
  • 可见Netty在捕获到异常后,并不是马上处理的。

跟进notifyHandlerException方法:
io.netty.channel.AbstractChannelHandlerContext#notifyHandlerException

    private void notifyHandlerException(Throwable cause) {
        // 判断异常栈中是否含有exceptionCaught方法以及日志级别。
        if (inExceptionCaught(cause)) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "An exception was thrown by a user handler " +
                                "while handling an exceptionCaught event", cause);
            }
            return;
        }
        // 代码会来到这里
        invokeExceptionCaught(cause);
    }

跟进invokeExceptionCaught方法,此处【坐标1】
io.netty.channel.AbstractChannelHandlerContext#invokeExceptionCaught(java.lang.Throwable)

    private void invokeExceptionCaught(final Throwable cause) {
        if (invokeHandler()) {
            try {
                // 代码会来到这里
                handler().exceptionCaught(this, cause);
            } catch (Throwable error) {
                ...(略)
            }
        } else {
            fireExceptionCaught(cause);
        }
    }
  • handler()的返回值就是InBoundHandlerB实例:handler()值.png

跟进exceptionCaught方法,就来到InBoundHandlerB覆写的exceptionCaught方法了:

InBoundHandlerB#exceptionCaught.png

  • 可见Netty的channel事件异常最终会尝试调用exceptionCaught实现处理。

由于我们自定义的代码选择继续传播,那么就继续跟进fireExceptionCaught方法:
io.netty.channel.AbstractChannelHandlerContext#fireExceptionCaught

    @Override
    public ChannelHandlerContext fireExceptionCaught(final Throwable cause) {
        invokeExceptionCaught(next, cause);
        return this;
    }
  • 注意,这里是直接取next节点,而不是像inbound事件或outbound事件需要根据inbound/outbound标记判断取节点。
  • 这也是异常传播完全按照添加handler的顺序传播的原因。

跟进invokeExceptionCaught方法:
io.netty.channel.AbstractChannelHandlerContext#invokeExceptionCaught(next, cause)

    static void invokeExceptionCaught(final AbstractChannelHandlerContext next, final Throwable cause) {
        ObjectUtil.checkNotNull(cause, "cause");
        EventExecutor executor = next.executor();
        if (executor.inEventLoop()) {
            next.invokeExceptionCaught(cause);
        } else {
            try {
                executor.execute(new Runnable() {
                    @Override
                    public void run() {
                        next.invokeExceptionCaught(cause);
                    }
                });
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to submit an exceptionCaught() event.", t);
                    logger.warn("The exceptionCaught() event that was failed to submit was:", cause);
                }
            }
        }
    }

继续跟进这里的invokeExceptionCaught方法,会发现又回到【坐标1】的代码了,之后会一直传播到异常被处理 或 代码不再发起传播为止。后面IC,OA,OB,OC的传播我就不一一追了,逻辑跟上面完全一致。



跳过中间传播,最后来到TailContext:
传播到TailContext.png

跟进exceptionCaught方法:
io.netty.channel.DefaultChannelPipeline#onUnhandledInboundException

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            onUnhandledInboundException(cause);
        }

继续跟进onUnhandledInboundException方法:
io.netty.channel.DefaultChannelPipeline#onUnhandledInboundException

    protected void onUnhandledInboundException(Throwable cause) {
        try {
            logger.warn(
                    "An exceptionCaught() event was fired, and it reached at the tail of the pipeline. " +
                            "It usually means the last handler in the pipeline did not handle the exception.",
                    cause);
        } finally {
            ReferenceCountUtil.release(cause);
        }
    }
  • 这里会打印警告日志,即控制台的红字。

至此,异常的传播就完成了。


统一处理异常

  • Q:在上面的代码追踪后,发现Netty仅仅是打印了错误的警告日志,那如果我想自己处理异常,而且要统一处理,那该怎么办?
  • A:模仿TailContext,让异常传播到我们的自定义的handler处理并不再发起传播。

异常处理handler

ExceptionCaughtHandler.java

import com.imooc.netty.ch6.BusinessException;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class ExceptionCaughtHandler extends ChannelInboundHandlerAdapter {
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        // ..
        if (cause instanceof BusinessException) {
            System.out.println("Exception handled.");
        }
    }
}

然后再Server.java中添加这个handler到最后:

(略)
.childHandler(new ChannelInitializer<SocketChannel>() {
    @Override
    public void initChannel(SocketChannel ch) {
        ch.pipeline().addLast(new InBoundHandlerA());
        ch.pipeline().addLast(new InBoundHandlerB());
        ch.pipeline().addLast(new OutBoundHandlerA());
        ch.pipeline().addLast(new InBoundHandlerC());
        ch.pipeline().addLast(new OutBoundHandlerB());
        ch.pipeline().addLast(new OutBoundHandlerC());
        ch.pipeline().addLast(new ExceptionCaughtHandler());
    }
});
(略)

最终控制台反映效果如下:

控制台效果



小结

  • 异常的事件传播是直接取next节点,而不是像inbound事件或outbound事件需要根据inbound/outbound标记判断取节点。这意味着传播顺序=添加handler顺序。
  • 如果要自己处理异常,就不要让异常传播到TailContext。