前言
早在大约一年前,我就是在网上跟着某个课程用Netty写了个简单的聊天应用(websocket协议)。但那时我只懂得如何运用Netty实现课程的应用,一旦"跑题"就不行了,归根结底还是对Netty的一些工作原理压根不熟。
于是,在近期补充了一些简单的网络知识后,就重新回顾Netty,并找了一些课程、书籍开始入门。
首先,今天就先简单看看Netty的五个基本组件,先不深究细节,根据参考资料一步步来,怕自己乱搞陷进死胡同。
我本来是想以Netty的官方文档(netty.io)作为入门的主要参考资料,但是当我打开Netty的文档时,发现事情并没有我想象的那么顺利~(也许是我不会找)。
在Netty的官方文档中,我发现除了Javadoc和User guide会教你怎么跑Demo外,基本没有别的有用的东西了~
PS:Netty版本为4.1.6
一个简单的socket例子
Netty简单的来说其实就是为我们封装了大量jdk底层的socket+bio等代码,使得我们能通过修改一些参数或换个函数调用就能直接切换底层的网络模型,所以在这里先举一个简单的socket例子:
服务端启动主函数
public class ServerBoot {
private static final int PORT = 8000;
public static void main(String[] args) {
Server server = new Server(PORT);
server.start();
}
}
服务端
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
public class Server {
private ServerSocket serverSocket;
public Server(int port) {
try {
this.serverSocket = new ServerSocket(port);
System.out.println("服务端启动成功,端口:" + port);
} catch (IOException exception) {
System.out.println("服务端启动失败");
}
}
public void start() {
new Thread(new Runnable() {
@Override
public void run() {
doStart();
}
}).start();
}
private void doStart() {
while (true) {
try {
// 同步方法,监听
Socket client = serverSocket.accept();
new ClientHandler(client).start();
} catch (IOException e) {
System.out.println("服务端异常");
}
}
}
}
处理客户端数据
public class ClientHandler {
public static final int MAX_DATA_LEN = 1024;
private final Socket socket;
public ClientHandler(Socket socket) {
this.socket = socket;
}
public void start() {
System.out.println("新客户端接入");
new Thread(new Runnable() {
@Override
public void run() {
doStart();
}
}).start();
}
private void doStart() {
try {
InputStream inputStream = socket.getInputStream();
while (true) {
byte[] data = new byte[MAX_DATA_LEN];
int len;
while ((len = inputStream.read(data)) != -1) {
String message = new String(data, 0, len);
System.out.println("客户端传来消息: " + message);
socket.getOutputStream().write(data);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
}
客户端
import java.io.IOException;
import java.net.Socket;
public class Client {
// ip地址
private static final String HOST = "127.0.0.1";
// 端口号
private static final int PORT = 8000;
// 睡眠时间
private static final int SLEEP_TIME = 5000;
public static void main(String[] args) throws IOException {
final Socket socket = new Socket(HOST, PORT);
new Thread(new Runnable() {
@Override
public void run() {
System.out.println("客户端启动成功!");
while (true) {
try {
String message = "hello world";
System.out.println("客户端发送数据: " + message);
socket.getOutputStream().write(message.getBytes());
} catch (Exception e) {
System.out.println("写数据出错!");
}
sleep();
}
}
}).start();
}
private static void sleep() {
try {
Thread.sleep(SLEEP_TIME);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
执行效果
一个简单但不太严谨的流程图
Netty的五个基本组件
先列举出来,之后再简单介绍:
- NioEventLoop
- Channel
- ByteBuf
- Pipeline
- ChannelHandler
上面五个组件简单映射到上边的socket里面就类似于下图:
下面简单介绍下这五个类:
NioEventLoop
- 相当于Netty的发动机,负责客户端新来的连接。
- 开发中常用EventLoopGroup,里面含有一个或多个NioEventLoop。
Channel
- Netty传输数据的管道/数据载体,相当于socket的套接字。
我们找到下面这个函数io.netty.channel.nio.NioEventLoop#processSelectedKey(java.nio.channels.SelectionKey, io.netty.channel.nio.AbstractNioChannel)
找到下面一段代码点进read方法(NioByteUnsafe是客户端流读写,NioMesssageUnsafe是和新连接加入,我们这里选择后者):
进到read函数里再去找到下面这个函数:
点进去(父类NioServerSocketChannel),可以看到其实就是调用jdk底层创建:
ByteBuf
- 主要负责进行I/O流的操作。
- 抽象模型:io.netty.buffer.ByteBuf。
下面来看看抽象模型里面的函数:
除了读之外,还有写的api,我就不一一列举了。
Pipeline
- 就是处理连接的一条过滤链,这条链里面放各种Handler(下面会讲到)
为了更容易理解,我这了就放之前写过的一条链:
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import io.netty.handler.timeout.IdleStateHandler;
public class WSServerInitialzer extends ChannelInitializer<SocketChannel> {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// websocket 基于http协议,所以要有http编解码器
pipeline.addLast(new HttpServerCodec());
// 对写大数据流的支持
pipeline.addLast(new ChunkedWriteHandler());
// 对httpMessage进行聚合,聚合成FullHttpRequest或FullHttpResponse
// 几乎在netty中的编程,都会使用到此hanler
pipeline.addLast(new HttpObjectAggregator(1024*64));
// ====================== 以上是用于支持http协议 ======================
// ====================== 增加心跳支持 start ======================
// 针对客户端,如果在1分钟时没有向服务端发送读写心跳(ALL),则主动断开
// 如果是读空闲或者写空闲,不处理
pipeline.addLast(new IdleStateHandler(8, 10, 12));
// 自定义的空闲状态检测
pipeline.addLast(new HeartBeatHandler());
// ====================== 增加心跳支持 end ======================
// ====================== 以下是支持httpWebsocket ======================
/**
* websocket 服务器处理的协议,用于指定给客户端连接访问的路由 : /ws
* 本handler会帮你处理一些繁重的复杂的事
* 会帮你处理握手动作: handshaking(close, ping, pong) ping + pong = 心跳
* 对于websocket来讲,都是以frames进行传输的,不同的数据类型对应的frames也不同
*/
pipeline.addLast(new WebSocketServerProtocolHandler("/ws"));
// 自定义的handler
pipeline.addLast(new ChatHandler());
}
}
可以看到上面的代码就是往pipeline里面追加各种handler,用于处理客户端的连接过程。
那么默认情况下,Pipeline是在哪里创建的呢?我们可以看NioSocketChannel的构造函数:
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}
连续进两次super后就可以看到如下代码,此时可以找到pipeline的一个影子了:
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
ChannelHandler
- 就是放置在Pipeline中的Handler,组成的链负责对每个连接进行管理。
- DefaultChannelPipeline中有addXXX和removeXXX等管理Handler,如上面的addLast。
- 抽象代码:io.netty.channel.ChannelHandler 前面大多数都是回调,add或remove Pipeline之后的回调。
最后再小结一张Channel-Pipeline-ChannelHandler之间的关系图: