Netty 原理解析与开发实战(一)
一、Netty 概述
1.1 Java网络编程进化史
1.1.1 Java OIO
早期java提供了 java.net
包用于开发网络应用,这类API被称为阻塞Java OIO(阻塞IO)。
服务端主要实例代码:
ServerSocket serverSocket = new ServerSocket(port);
Socket clientSocket = serverSocket.accept();
InputStream in = clientSocket.getInputStream();
OutputStream out = clientSocket.getOutputStream();
客户端主要实例代码:
Socket clientSocket = new Socket();
clientSocket.connect(inetSocketAddress);
InputStream in = clientSocket.getInputStream();
OutputStream out = clientSocket.getOutputStream();
从上面的实例代码可以看出,OIO使用比较简单。但是,当需要开发一个大型的网络应用时,OIO就显得无能为力了,因为Socket和ServerSocket类库的API只支持由本地系统套接字库提供的所谓的阻塞函数,服务端和客户端的通信是阻塞的,OIO每条连接都需要一个线程来进行维护,这使得连接数受到了限制。
1.1.2 网络编程的相关概念
1)异步和同步
同步和异步描述的是用户线程与内核的交互方式:
- 同步:用户线程发起I/O请求后需要等待或者轮询内核I/O操作完成后才能继续执行。
- 异步:用户线程发起I/O请求后继续向后执行,当内核I/O操作完成后会通知用户线程或者调用用户线程注册的回调函数。
2)阻塞和非阻塞
阻塞和非阻塞描述的是用户线程调用内核I/O操作的方式:
- 阻塞:I/O操作需要彻底完成后才会回到用户空间。
- 非阻塞:I/O操作被调用后立即返回给用户一个状态值,无需等到I/O操作完成。
1.1.3 Java NIO
从 Java 1.4开始,Java提供了NIO,用来替代标准 Java I/O API。Java NIO也被称为 "Non-blocking I/O",提供了非阻塞I/O的方式。
Java NIO中的几个核心概念:
-
通道(Channel)和缓冲区(Buffer):标准的I/O是基于字节流和字符流进行操作,而NIO是基于通道和缓冲区进行操作,数据总是从通道写入缓冲区,或者从缓冲区写入通道。通道一般是双向的。
-
非阻塞IO(Non-blocking I/O):Java NIO可以非阻塞的使用I/O。特例:FileChannel是阻塞的,但是可以FileChannel提供map方法来进行内存映射(
MappedByteBuffer
),提高了效率。 -
选择器(Selector):Java NIO引入了选择器的概念,选择器用于监听多个通道的事件。
服务端主要实例代码:
ServerSocketChannel serverChannel = new ServerSocketChannel.open();
serverChannel.bind(address);
serverChannel.configureBlocking(false);
Selector selector = Selector.open();
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
while(true) {
// 该方法将阻塞线程
selector.select();
Set<SelectionKey> readyKeys = selector.selectedKeys();
Iterator<SelectionKey> iter = readyKeys.iterator();
while(iter.hasNext()) {
SelectionKey key = iter.next();
// 必须手动移除
iter.remove();
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key.channel();
SocketChannel client = server.accept();
// 设置非阻塞
client.configureBlocking(false);
// 客户端注册到Selector
SelectionKey clientKey = client.register(selector, SelectionKey.OP_WRITE | SelectionKey.OP_READ);
// 附加缓存区到该Channel
ByteBuffer buffer = ByteBuffer.allocate(100);
clientKey.attach(buffer);
}
if (key.isReadable()) {
SocketChannel client = (SocketChannel) key.channel();
ByteBuffer output = (ByteBuffer) key.attachment();
client.read(output);
key.interestOps(SelectionKey.OP_WRITE);
}
if (key.isWritable()) {
//...
key.insterestOps(SelectionKey.OP_READ)
}
}
}
客户端主要实例代码:
SocketChannel clientChannel = SocketChannel.open();
clientChannel.connect(address);
ByteBuffer byteBuffer = ByteBuffer.allocate(32);
clientChannel.read(byteBuffer);
clientChannel.write(byteBuffer);
1.1.4 Java AIO
从Java1.7开始,Java提供了AIO(异步IO)。Java AIO被称为"NIO.2",用法与标准I/O有非常大的区别。
Java AIO采用“订阅——通知”模式。和同步I/O一样,Java AIO是由操作系统支持的。微软的Windows系统提供了一种异步IO技术——IOCP(I/O CompletionPort,I/O完成端口),Linux下使用epoll来模拟异步I/O。
Java AIO的缺点:
- 在Linux平台下使用epoll进行模拟,AIO在Linux下效果不好
- 没有提供对UDP的支持
服务端主要实例代码:
AsynchronousServerSocketChannel serverSocketChannel = AsynchronousServerSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(10090);
serverSocketChannel.bind(address);
System.out.println("bind success...");
while (true) {
AsynchronousSocketChannel socketChannel = null;
try {
System.out.println("waiting for connection...");
Future<AsynchronousSocketChannel> future = serverSocketChannel.accept();
// 如何Future中结果还没返回,则调用future.get()会阻塞线程
socketChannel = future.get();
ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
while ((socketChannel.read(byteBuffer).get() != -1)) {
byteBuffer.flip();
socketChannel.write(byteBuffer);
System.out.println(byteBuffer);
if (byteBuffer.hasRemaining()) {
byteBuffer.compact();
} else {
byteBuffer.clear();
}
}
} catch (ExecutionException | InterruptedException e) {
e.printStackTrace();
break;
} finally {
if (socketChannel != null) socketChannel.close();
}
}
客户端主要实例代码:
AsynchronousSocketChannel socketChannel = AsynchronousSocketChannel.open();
InetSocketAddress address = new InetSocketAddress(10090);
Future<Void> future = socketChannel.connect(address);
Scanner scanner = new Scanner(System.in);
ByteBuffer byteBuffer = ByteBuffer.allocate(100);
if (future.isDone()) {
while (true) {
String s = scanner.nextLine();
byteBuffer.put(s.getBytes(StandardCharsets.UTF_8));
byteBuffer.flip();
socketChannel.write(byteBuffer);
byteBuffer.clear();
Future<Integer> future1 = socketChannel.read(byteBuffer);
if (future1.get() > 0) {
byte[] bytes = new byte[future1.get()];
byteBuffer.rewind();
byteBuffer.get(bytes);
System.out.println(new String(bytes, StandardCharsets.UTF_8));
}
byteBuffer.clear();
}
}
1.1.5 Java原生API之痛
Java原生API缺乏并发能力,Java的NIO和AIO没有提供断线重连、网络闪断、半包读写、失败缓存、网络拥塞和异常码等的处理,需要开发人员来补齐。Java原生API在实际项目中应用并不广泛,取而代之的是第三方框架Netty。
1.2 Netty的优势
Netty是一款异步事件驱动的高性能网络应用框架。
主要有以下特点:
-
非阻塞I/O:Netty是基于Java NIO API实现的网络框架,内部对Java NIO进行了封装,极大的简化了网络程序的开发过程。
-
丰富的协议:支持丰富的网络协议,如TCP、UDP、HTTP、HTTP/2、WebSocket、SSL/TLS等
-
异步和事件驱动:由于Java AIO在Linux平台上效果不理想,Netty并没有采用Java AIO来实现。Netty所有的操作都是异步的,会立即返回,不保证操作是否成功,调用会返回ChannelFuture,Netty会通过ChannelFuture来通知调用是否成功、失败、或者取消。事件驱动:调用者可以通过在ChannelFuture上注册监听,来获取操作结果。
-
精心设计的API
-
丰富的缓存实现
-
高效的网络传输
二、Netty的架构设计
Netty架构设计图如上,Netty核心组价有三个,分别是:
- 基于灵活且可扩展的事件模型
- 统一的交互API
- 支持零拷贝的富字节缓冲区
三、理解Netty中的Channel
3.1 Channel类
Channel对象具有如下特点:
- Nettty网络通信的组建,能够执行I/O操作
- 通过Channel可以获取当前网络连接的状态
- 通过Channel可以获取当前网络连接的全部配置
- Channel提供异步的I/O操作,这意味着调用会立即返回,但是不保证在调用结束时请求已完成
- 调用I/O操作会返回一个
ChannelFuture
实例,可在其上面注册监听,当操作完成、取消、失败时会收到通知 - 不同协议、不同阻塞类型的连接都有Channel实现与之对应
常用的Channel实现有:
NioServerSocketChannel
、NioSocketChannel
、NioDatagramChannel
、NioSctpServerChannel
、NioSctpChannel
等
Netty的SocketChannel内部维护了java.nio.channels
包中的SocketChannel
,它俩是包含关系,实际的数据读取都是发生在java.nio.channels.SocketChannel
类的对象上的。
Channel与Pipeline、Handler、ChannelHandlerContext具有如下关系:
其中Pipeline和Channel是一对一的关系,一个Channnel只会和一个Pipeline互相关联。
3.2 ChannelHandler类
ChannelHandler是我们使用Netty进行网络编程的时候用到最多的类,我们对数据的操作处理都是在该类的对象中进行的,比如:数据编解码、数据加解密、数据过滤、业务逻辑等。
ChannelHandler
类的主要功能如下:
- 处理I/O事件和拦截I/O请求,并将其转发至ChannelPipeline的下一个ChannelHandler中
这个接口需要实现很多方法,一般在使用过程中,我们会继承它的实现类,而不是实现这个接口,常用的实现类有:
ChannelInboundHandlerAdapter
:用于处理入站事件ChannelOutboundHandlerAdapter
:用于处理出站事件SimpleChannelInboundHandler
:用于处理入站事件,使用了范型,无需类型转换ChannelDuplexHandler
:处理入站和出站事件
3.3 ChannelPipeline类
从上面的图可以看出,ChannelPipeline中包含了很多个ChannelHandler,这些ChannelHandler会在有I/O事件时被有选择的调用。
ChannelPipeline也可以看作是ChannelHandler的容器。ChannelPipeline接口设计采用了责任链设计模式,底层采用了双向链表的数据结构(链表上的元素为ChannelHandlerContext
),将链上的各个处理器串联起来。ChannelPipeline链上的每个处理器都有处理事件的机会,处理器必须调用ChannelHandlerContext
中的方法才能将事件传播下去。
3.4 ChannelHandlerContext类
内部保存了当前Channel的上下文信息,并且内部包含一个ChannelHandler
实例,主要功能是为当前ChannelHandler的执行提供上下文环境和辅助ChannelPipeline
完成链式调用。
3.5 ChannelOption类
作用:
- ChannelOption.SO_BACKLOG:设置线程等待连接队列的大小
- ChannelOption.SO_KEEPALIVE:设置Channel保持连接状态
3.6 EventLoop和NioEventLoopGroup
EventLoop是基于事件循环设计的,事件循环机制的思想是:用一个线程不断循环接收处理事件。
EventLoopGroup是一组EventLoop的抽象,Netty中,为了更好的利用CPU资源,通常维持着一组EventLoop同时工作。
在Netty中,主要有两类EventLoopGroup,一类是BossEventLoopGroup,一类是WorkerEventLoopGroup,前者通常是单线程,负责接受accept事件,后者通常是多线程,负责处理I/O事件。
特点:
- 每个EventLoop中都维护着一个Selector实例,一个EventLoop上可以注册多个Channel
- 当BossEventLoopGroup接收连接后,将调用WorkerEventLoopGroup的next方法按一定规则选择一个EventLoop来注册Channel
3.7 关于Channel的一些好用工具类
ChannelGroup:顾名思义是一个Channel组,提供了一组方便管理和操作Channel的方法,具体如下:
- writeAndFlush(msg, matcher):批量写入ChannelGroup所管理的Channel,可提供相应的匹配器来过滤Channel。
- add(Channel):向Channel组添加一个Channel,并且可以做到在Channel连接断开的时候自动在Channel组中移除该Channel(通过添加监听实现)
Netty中的实现类为DefaultChannelGroup
。
四、Netty中的缓存模型
4.1 Java NIO中的Buffer
4.1.1 Buffer的原理
Buffer是Java NIO中用到的缓冲对象,从Java NIO通道对象中读写数据都离不开Buffer
及其实现类。
Buffer中的接口实现都离不开四个标志:position、mark、limit、capacity。
Java NIO中的Buffer有区分读写操作,不可同时进行读写操作,在两个操作之间需要调用flip
函数进行转换。
上图展示了Buffer的读模式,处于读模式时,各个标志含义如下:
- position:下一个可读的位置
- mark:读标记
- limit:可读的最大位置,不超过capacity
- capacity:当前buffer的容量
上图展示了Buffer的写模式,处于写模式时,各个标志含义如下:
- position:下一个可写的位置
- mark:写标记
- limit:可写的最大位置,不超过capacity
- capacity:当前buffer的容量
4.1.2 Buffer的类型
在Java NIO中主要有两种Buffer类型:
- 堆上缓冲:从Java堆上分配的缓存空间
- 直接内存缓存:从堆外内存(不属于JVM管理)上分配的缓存空间
其中直接内存缓存中有一类特殊的缓存被称为:内存映射,该技术将用户空间缓存和内核空间缓存通过虚拟地址变换映射到了同一块物理内存空间,避免了数据从内核空间到用户空间的复制和减少了上下文切换开销。详见:IO数据拷贝的几种方式——传统、mmap、sendFile、splice
内存映射对应的实现类为MappedByteBuffer
,仅通过FileChannel.map
函数生成。
在堆上分配的缓存内部都维护了一个数组,而直接内存缓存则没有该数组,也就是说直接内存缓存我们通过调用buffer.array()
是获取不到数组的,会出现异常。
Buffer的常用实现类有下面几种:
ByteBuffer:
最常用的缓存类型。常用的方法如下:
// 分配堆上缓存
public static ByteBuffer allocate(int capacity)
// 分配直接内存缓存
public static ByteBuffer allocateDirect(int capacity)
// 将字节包装为缓存
public static ByteBuffer wrap(byte[] array, int offset, int length)
// 获取一个字节,并将position+1
public abstract byte get();
// 将当前ByteBuffer压紧,如删除已读的字节
public abstract ByteBuffer compact();
// 进行读写转换
public final Buffer flip()
// 返回是否该buffer可继续读
public final boolean hasRemaining()
常用的Buffer还有:ShortBuffer、CharBuffer、IntBuffer、LongBuffer、FloatBuffer、DoubleBuffer。
4.2 Netty中的ByteBuf
4.2.1 ByteBuf 介绍
ByteBuf是可以自动扩容的。
因为Java NIO中的ByteBuffer存在限制,比如读写切换时需要主动调用flip
函数和提供的API不够丰富等原因,Netty使用了自建的缓存系统,主要的类为ByteBuf
,主要使用了如下三个标志:
- readerIndex:下一个可读的位置
- writerIndex:下一个可写的位置
- capacity:缓存容量
从分配方式上来说,Netty支持三种模式的ByteBuf,分别是:
- 堆缓存模式:在Java堆上分配
- 直接缓存模式:在堆外内存上分配
- 混合模式:实现类为
CompositeByteBuf
,它的作用是可以将多个不同类型的ByteBuf组合起来,共同完成读写,就好像读写单个ByteBuf一样
从空间利用的角度上说,可以将缓存分为:
- 非池化缓存:通过
UnpooledByteBufAllocator
分配器分配,不过一般我们使用Unpooled
工具类来构造缓存 - 池化缓存:通过
PooledByteBufAllocator
分配器分配,池化缓存在使用完毕后,会放入池内,循环使用,和线程池类似。
4.2.2 ByteBuf 类及其子类和常用函数
ByteBuf,内部维护了一个byte数组,主要用来对从通道中获取的数据进行高效读写,提供了下面的方法:
// 返回是否可读
public abstract boolean isReadable();
// 返回可读的字节数
public abstract int readableBytes();
// 返回可写的字节数
public abstract int writableBytes();
// 丢弃已读的字节,类似于compact
public abstract ByteBuf discardReadBytes();
// get方法,不修改 readerIndex指针值
public byte get()
// read方法,读后指针值+1
public byte read()
// set方法,不修改 writeIndex指针值
public void set(byte b)
// 返回当前缓存区的切片,不共享readerIndex和writeIndex,但是共享底层数据,也就是说该切片的修改也会同步到当前缓存
public abstract ByteBuf retainedSlice(int index, int length);
CompositeByteBuf,可以将多个不同类型的ByteBuf组合起来,并共享底层数据,在进行读写操作的时候是无感知的。内部维护了一个Component
数组来提供辅助,每个Component
都包含了一个ByteBuf。
常用方法如下:
// 将多个ByteBuf加入到当前缓存中,并且指定是否增加writerIndex,一般来说,这个值为true才可以正常通过当前缓存读数据
public CompositeByteBuf addComponents(boolean increaseWriterIndex, ByteBuf... buffers)
// 这个方法与上面方法的区别是这里的byteBuf可以是CompositeByteBuf类型
public CompositeByteBuf addFlattenedComponents(boolean increaseWriterIndex, ByteBuf buffer)
4.2.3 ByteBuf分配器
ByteBuf有两种类型的分配器,它们都实现了ByteBufAllocator
接口,分别是:
- PooledByteBufAllocator:该分配器将ByteBuf实例放入了池中,提高了性能,将内存碎片减少到最小。该实现采用了一种高效的内存分配策略,称为
jemalloc
。它可以重复利用之前分配的内存空间。 - UnpooledByteBufAllocator:该分配器没有把ByteBuf放入池中,每次调用时,都会返回一个新的ByteBuf实例,这些实例由JVM自己负责做GC回收。
有两种方式改变Netty中ByteBuf分配器的方式:
- 通过java系统参数选项
io.netty.allocator.type
指定,有两个选项:pooled、unpooled。 - 在启动器上指定,通过设置启动选项
bootstrap.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false))
指定。
4.2.4 ByteBufUtil类
ByteBufUtil类提供了一系列静态方法用于操作ByteBuf,在实际工作中非常有用,常用的方法类型如下:
- 将ByteBuf转换为十六进制字符串或将十六机制字符串转化为byte数组
// 将ByteBuf转换为16进制字符串
public static String hexDump(ByteBuf buffer)
// 将十六进制字符串转换为byte数组
public static byte[] decodeHexDump(CharSequence hexDump)
- 编解码字符串
// 高效的将utf8字符串写入到ByteBuf实例中
public static int writeUtf8(ByteBuf buf, CharSequence seq)
// 按指定字符集将字符串编码为ByteBuf类型
public static ByteBuf encodeString(ByteBufAllocator alloc, CharBuffer src, Charset charset)
4.2.5 ByteBufHolder 接口
故名思义,它是ByteBuf的一个容器。在Netty中非常有用,例如,Http的请求和响应都可以携带消息体,这个消息体就是ByteBuf对象。由于不同的协议可以包含不同的协议字段和功能,因此,需要对ByteBuf进行包装和抽象,不同的协议有不同的实现。
为了满足此类定制化需求,Netty抽象出了ByteBufHolder
接口,它的默认实现为DefaultByteBufHolder
类,包含了一个ByteBuf,另外提供了一些其他实用的方法,例如缓冲区池化等。
// 返回该ByteBufHolder所持有的ByteBuf对象
ByteBuf content();
// 返回一个深拷贝对象
ByteBufHolder copy();
// 复制当前ByteBufHolder对象,浅拷贝,并且不会增加引用计数
ByteBufHolder duplicate();
// 复制当前ByteBufHolder对象,浅拷贝,并增加引用计数
ByteBufHolder retainedDuplicate();
// 替换当前ByteBufHolder所持有的内容
ByteBufHolder replace(ByteBuf content);
4.2.6 Netty中的零拷贝
- 如果在构造ByteBuf时,传入另一个ByteBuf,这时候新创建的ByteBuf将和该ByteBuf共享底层数据
- 如果在构造ByteBuf时,传入的是一个byte数组,该byte数组将直接作为ByteBuf的底层数组
- 通过CompositeByteBuf类,将多个ByteBuf进行了组合,同时也和这些ByteBuf共享了底层数据
- Netty中的
FileRegion
底层调用了FileChannel.transferTo
函数,实现了真正的零拷贝
4.2.7 关于Netty的接收缓存
Netty接收缓存指Netty在接受网络IO接口数据时分配的缓存,关于接受缓存有两种类型的接收缓存分配器,分别是:
- FixedRecvByteBufAllocator:每次分配的缓存初始容量为固定大小。
- AdaptiveRecvByteBufAllocator:可根据历史分配记录来动态调整下一次分配的缓存容量大小。
五、启动引导程序
5.1 服务端启动引导程序 ServerBootstrap
服务端引导程序是用来做TCP协议的服务端程序的,有包含以下内容可以配置:
- group(boss, worker):用来配置boss事件循环组和worker事件循环组
- channel(class):用来配置服务端Channel实现类,非阻塞TCP配置为NioServerSocketChannel.class
- handler(serverHandler):用来配置接受请求前的handler处理器
- childHandler(handler):用来配置新接受的连接Channel的处理器,一般配置为ChannelInitializer的子类。
- option(ChannelOption):用来配置当前Channel的选项
服务端一般会在配置好ServerBootstrap后,进行绑定端口操作,示例代码如下:
NioEventLoopGroup boss, worker;
boss = worker = null;
int port = 8080;
try {
boss = new NioEventLoopGroup(1);
worker = new NioEventLoopGroup(8);
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(boss, worker)
.channel(NioServerSocketChannel.class)
.handler(new LoggingHandler())
.option(ChannelOption.SO_BACKLOG, 128)
.option(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
WebSocketServerProtocolHandler webSocketServerProtocolHandler = new WebSocketServerProtocolHandler("/hello",
null, true, false, 10000, null);
ch.pipeline()
.addLast(new HttpServerCodec())
.addLast(new HttpObjectAggregator(8192))
.addLast(webSocketServerProtocolHandler)
.addLast(new WebSocketTextHandler())
.addLast(new WebSocketShowPongHandler())
.addLast(new EchoTextHandler());
}
});
ChannelFuture bind = serverBootstrap.bind(port);
bind.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("服务器绑定端口成功!");
}
}
}).sync().channel().closeFuture().sync();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
assert boss != null;
assert worker != null;
boss.shutdownGracefully().awaitUninterruptibly();
worker.shutdownGracefully().awaitUninterruptibly();
}
5.2 ServerBootstrap 源码解析
ServerBootstrap是一个启动引导类,内部包含了如下字段:
private final Map<ChannelOption<?>, Object> childOptions = new LinkedHashMap<ChannelOption<?>, Object>();
private final Map<AttributeKey<?>, Object> childAttrs = new ConcurrentHashMap<AttributeKey<?>, Object>();
private final ServerBootstrapConfig config = new ServerBootstrapConfig(this);
private volatile EventLoopGroup childGroup;
private volatile ChannelHandler childHandler;
并且它继承了AbstractBootstrap<ServerBootstrap, ServerChannel>
类,当我们通过语句new ServerBootstrap()
新建了一个Serverbootstrap后,一般会调用它的group
函数来配置事件循环组,channel
来配置Channel类型,option
来配置SocketChannel,childHandler
来配置事件处理器。
当我们调用ServerBootstrap的bind
方法后,ServerBootstrap会调用AbstractBootstrap的doBind
方法,该方法会对Channel进行初始化和将Channel注册到NioEventLoopGroup上,并且在初始化Channel时,也调用了ServerBootstrap.init()
方法对Channel进行了配置,添加了通过handler
方法配置的处理器和ServerBootstrapAcceptor
处理器, ServerBootstrapAcceptor
处理器主要用来接收连接并对其进行配置和将其注册到childGroup上。
Acceptor向childGroup注册连接的流程如下:主动调用childGroup对象的register
方法,该方法会调用childGroup对象内部的选择器 chooser
对象的next
方法,该方法会按照一定的规则选择出一个事件循环器,然后调用该将事件循环器的register
方法,该register
方法又调用了Channel的 Channel.Unsafe 对象的regsiter
方法,该方法将当前Channel对应的javaChannel注册到了该事件循环器中的Selector对象,如果配置了自动读,那么还会调用AbstractChannel的doBeginRead
方法。至此,注册完成。
5.3 客户端启动引导程序 Bootstrap
客户端启动引导程序配置过程中,需要配置的项如下:
- group(worker):配置相应的worker事件循环组
- handler(handler):配置处理器,一般新建一个匿名类ChannelInitializer类来配置该项
- channel(class):配置当前通道类型,非阻塞TCP配置为NioSocketChannel
- option(ChannelOption):配置当前通道的参数
客户端配置好之后,需要调用connect连接服务器,示例代码如下:
int port = 8080;
String remoteAddress = "localhost";
NioEventLoopGroup worker = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(worker)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer<NioSocketChannel>() {
@Override
protected void initChannel(NioSocketChannel ch) throws Exception {
ch.pipeline()
.addLast(new MyChannelHandler());
}
})
.option(ChannelOption.SO_KEEPALIVE, true)
.option(ChannelOption.ALLOCATOR, new UnpooledByteBufAllocator(false));
ChannelFuture connect = bootstrap.connect(remoteAddress, port);
connect.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("连接服务器成功!");
}
}
});
connect.sync();
printChannel((SocketChannel) connect.channel());
} catch (Exception e) {
e.printStackTrace();
} finally {
worker.shutdownGracefully().awaitUninterruptibly();
}
5.3 常用的ChannelOption选项
- ChannelOption.SO_BACKLOG:用于配置服务器接收连接队列长度,如果队列已满,客户端连接将被拒绝,windows默认200,其他默认128
- ChannelOption.SO_KEEPLIVE:是否保持连接,可以将其视为心跳机制,但是默认的心跳间隔是7200s(2小时)。
- ChannelOption.ALLOCATOR:指定缓存分配器类型
- ChannelOption.RCVBUF_ALLOCATOR:指定接收缓存分配器类型
- ChannelOption.CONNECT_TIMEOUT_MILLIS:指定连接超时毫秒数
- ChannelOption.WRITE_BUFFER_WATER_MARK:配置写高低水位大小,如果Netty的写缓冲区中的字节超过高水位(默认64KB),则Channel的isWritable()返回false。如果写缓冲区中的字节超过高水位后若下降到低水位(默认32KB),则Channel的isWritable()返回Ture,否则,返回false。写高低水位标记使用户可以控制写入速度,从而实现流量控制。
- ChannelOption.SO_REUSEADDR:配置是否允许重复使用本地地址和端口
- ChannelOption.ALLOW_HALF_CLOSURE:配置是否允许半关闭
六、Netty的线程模型
6.1 Netty的线程模型
关于线程模型,详见线程模型
Netty采用的是Reactor线程模型,可根据配置的事件循环组来判断属于Reactor中的哪一个。
- 单Reactor单线程:只配置了一个NioEventLoopGroup,且线程数量设置为1
- 单Reactor多线程:只配置了一个NioEventLoopGroup,且线程数量大于1
- 主从Reactor多线程:配置了两个NioEventLoopGroup,BoosGroup和WorkerGroup
Netty的线程模型图如下:
6.2 Netty的任务调度和异步模型
6.2.1 Netty的任务调度
Java早期的任务任务调度采用的是Timer和TimeTask,Timer有很多局限性,例如它是基于绝对事件而不是相对时间、不捕获异常等。
Java 5之后,任务调度推荐使用 java.util.concurrent包下的ScheduledExecutorService
接口。
实例代码:
ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
scheduledExecutorService.schedule(()->{
// do something
}, 1000, TimeUnit.SECONDS);
Netty没有使用java自带的任务调度器,因为ScheduledExecutorService
是使用它内部维护的线程来调度任务执行的,如果采用ScheduledExecutorService
则会有更多线程开销,且会导致更多的线程上下文切换。基于此,Netty在NioEventLoop
中实现了ScheduledExecutorService
接口,使用了EventLoop线程,没有额外的线程开销。
在ChannelHandler
中可通过如下代码来做任务调度:
ctx.channel().eventLoop().schedule(()-> {
// do something
}, 10, TimeUnit.SECONDS);
6.2.2 Netty的异步模型
Java也提供了异步模型,比如java.util.concurrent.Future
接口,比较常用的实现是java.util.concurrent.CompletableFuture
。
但是Java提供的异步接口在使用上有一定的局限性,因此Netty实现了自己的异步模型。在Netty中,很多函数的调用会立即返回,不会阻塞线程,但是这并不意味着操作已完成。示例代码如下:
ChannelFuture write = ctx.write(byteBuf);
上面write
方法返回了一个ChannelFuture对象,该对象是Netty中比较常用的一个异步对象,它有如下常用方法:
// 操作是否已做
boolean isDone();
// 操作是否成功
boolean isSuccess();
// 操作是否被取消
boolean isCancelled();
// 获取操作结果
V get()
// 添加回调,当操作完成后会被调用
ChannelFuture addListener(GenericFutureListener<? extends Future<? super Void>> listener);
// 阻塞当前线程,直到操作完成
ChannelFuture sync();
状态变化图如下:
如果我们需要对操作进行监听,可以使用是如下方法:
ChannelFuture write = ctx.write(byteBuf);
write.addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (future.isSuccess()) {
System.out.println("写入成功");
} else {
System.out.println("写入失败");
future.channel().close();
}
}
});
Promise:作用同ajax中的Promise类似,继承自Netty的Future
,返回异步执行结果,并且该类是可写的,我们在进行异步编程时可以用到它。
使用示例:
EventExecutor executor = GlobalEventExecutor.INSTANCE;
Promise<Integer> promise = cal(executor);
promise.addListener(new FutureListener<Integer>() {
@Override
public void operationComplete(Future<Integer> future) throws Exception {
if (future.isSuccess()) {
// 执行操作成功后的下一步
System.out.println("操作已完成");
}
}
});
// 异步函数
public static Promise<Integer> cal(EventExecutor executor) {
Promise<Integer> promise = new DefaultPromise<>(executor);
executor.submit(()->{
// 执行异步任务
// do something
// 设置是否成功
promise.setSuccess(123);
});
return promise;
}
七、编解码器
数据在网络中是以二进制流的形式传输的,而我们在程序中不可能直接对二进制数据进行处理,因此当我们接受到网络上的byte数据后,先要进行解码,将byte数据转换为我们需要的数据格式,如字符串、Json等。
上图展示了Pipeline中编解码器所在的一个相对位置,非绝对位置,比如编码器前面也可以放一个IdleStateHandler
来做心跳检测。
7.1 解码器
解码器的主要作用就是将入站数据解码为指定的格式。常用的解码器有两类:
- ByteToMessageDecoder:将字节解码为消息,需要用户自己实现
decode
函数 - MessageToMessageDecoder:将消息解码为消息,同样需要用户自己实现
decode
函数
7.1.1 沾包、半包问题
在使用TCP的时候,不可避免的会遇到沾包和半包问题,因为TCP是面向字节流的,收发两端的socket连接是成对出现的,发送方为了提高效率,使用了优化算法,将多个时间间隔较小且数据量小的数据合并为一个数据块,然后进行封包。这样做虽然提高了效率,但是接收方就难以分辨出完整的数据包了,因此面向流的通信是无消息保护边界的。
Netty提供了下面类型处理器来解决此类问题:
- FixedLengthFrameDecoder:固定长度解码器
- LineBasedFrameDecoder:行分割解码器,
\r\n \n
都会被处理 - DelimiterBasedFrameDecoder:可指定分隔符的解码器
- LengthFieldBasedFrameDecoder:不定长解码器
Netty还提供了ReplayingDecoder
类,允许用户不检查可读长度直接读取数据,并且用户可以提供一个枚举来维护状态变化。ReplayingDecoder将原始的ByteBuf对象包装成了ReplayingDecoderByteBuf
类,当我们通过该类读数据的时候,如果长度不足,它会抛出一个错误,然后在ReplayingDecoder
类捕获到错误后将重置buffer的readerIndex。
用户也可以设置模版为Void
类型,即ReplayingDecoder<Void>
,自己管理状态。
ReplayingDecoder
的checkpoint
方法会显著提高ReplayingDecoder
的性能,需要在每次读取数据后调用,如果不需要维护内部状态,则只需调用无参的checkpoint
方法。
用ReplayingDecoder
类实现一个变长解码器示例如下(该实现也可直接使用Netty提供的LengthFieldBasedFrameDecoder
类):
public class FlexLengthDecoder extends ReplayingDecoder<FlexLengthDecoder.DecoderState> {
static enum DecoderState {
STATE_LEN,
STATE_COUNT
}
private int len = 0;
public FlexLengthDecoder() {
super(DecoderState.STATE_LEN);
}
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
switch (state()) { // 初始在构造器中指定
case STATE_LEN:
len = in.readInt();
checkpoint(DecoderState.STATE_COUNT); // 调用checkpoint来改变状态
case STATE_COUNT:
ByteBuf buf = in.readBytes(len);
out.add(buf);
checkpoint(DecoderState.STATE_LEN);
break;
default:
throw new Exception("未知错误");
}
}
}
如果ByteBuf中字节足够,解码器中的out可以添加多个对象,示例如下:
/**
* 假设传来的数据格式如下:
*
* | str_len | str |
*
**/
public class FlexByteToStringDecoder extends ByteToMessageDecoder {
// -1 表示长度属性还未读取
private int beforeLen = -1;
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
if (beforeLen == -1) this.beforeLen = readInt(in);
if (beforeLen == -1) return;
ByteBuf buf = Unpooled.buffer(beforeLen);
while (beforeLen != -1 && in.readableBytes() >= beforeLen) {
in.readBytes(buf, beforeLen);
beforeLen = -1;
out.add(buf.toString(CharsetUtil.UTF_8));
buf.clear();
beforeLen = readInt(in);
}
}
private int readInt(ByteBuf buf) {
if (buf.readableBytes() >= Integer.BYTES) {
return buf.readInt();
}
return -1;
}
}
7.1.2 ByteToMessageDecoder 字节到消息解码器
上面解决了沾包、半包的问题,现在我们得到了指定长度的数据,但是数据还是byte类型,不是人能直接阅读的信息,因此我们需要将byte数据转换为消息。
Netty 提供了StringDecoder
用来将byte数据转换为字符串类型。
如果需要实现自定的字节到消息解码器,只需要继承ByteToMessageDecoder
类实现decode
方法即可。
7.1.3 MessageToMessageDecoder 消息到消息解码器
消息到消息的解码器可以实现消息的转换或者聚合,将多个消息聚合为一个消息,进行批量处理。
7.2 编码器
编码器就是将出站消息转换为字节数据,正好同解码器相反。
常用的编码器有如下两种:
- MessageToByteEncoder:从消息到字节的编码器
- MessageToMessageEncoder:从消息到消息的编码器
如果需要实现发送变长数据,Netty有提供LengthFieldPrepender
,该类主要功能是在接受到的字节前加长度。这和前面提到的LengthFieldBasedFrameDecoder
相对应。
编码器一般受对等方通信协议所约束,需要按照自定义协议内容来实现。
7.3 编解码器
编解码器同时包含了解码处理器和编码处理器,将入站和出站的信息转换都放到了同一个类中。
一般编解码器都继承了ChannelDuplexHandler
类,该类同时接受入站和出站事件。
Netty 中编解码器的抽象主要有两种:
- ByteToMessageCodec:字节到消息的编解码器
- MessageToMessageCodec:消息到消息的编解码器
Netty同时也提供了一个ChannelDuplexHandler
的子类CombinedChannelDuplexHandler
来聚合编码器和解码器。
7.4 序列化数据
说起序列化,我们首先会想到Java的序列化接口Serializable
接口,该接口用来标识可被序列化的类。下面来介绍几种常用的序列化方案,以及它们的优缺点。
7.4.1 Java原生序列化
Netty提供了ObjectEncoder
、ObjectDecoder
、CompatibleObjectDecoder
、CompatibleObjectEncoder
来对Java原生序列化提供支持,ObjectEncoder
和ObjectDecoder
内部实现上在头部有相应的长度处理,因此无需再使用LengthFieldPrepender
等对象来标识byte数据长度。
其中 ObjectEncoder
、ObjectDecoder
构建于Java序列化之上,Netty做了一些性能改进,因此只适合于远端也同样使用了Netty框架的远程节点。
而 CompatibleObjectDecoder
、CompatibleObjectEncoder
适用于远端使用了Java序列化的非基于Netty的远程节点进行互操作。
使用Java原生序列化时,需要将ObjectEncoder和ObjectDecoder加载业务代码之前,同时要序列化的对象必须实现Serializable
接口。
优点:
- 实现简单
缺点:
- 序列化性能太低
- 传输效率低,序列化后对象占用空间大
- 无法跨语言,对端系统也必须基于Java语言实现
JBoss Marshalling 序列化
JBoss针对Java原生序列化存在的问题做了一些改进,同时保留了与java.io.Serializable
及相关类的兼容性,并增加了一些可调优的参数和额外的特性,因此也是一种可选的序列化方案。
7.4.2 JSON序列化
一般在http协议中,我们使用JSON格式来传输数据,但是在tcp中进行高效传输一般会选择谷歌的Protobuf。
JSON序列化的方式较多,比如 Gson、fastjson、jackson等,fastjson是阿里出品,但是推荐使用jackson。
优点:
- 简单易读,使用json序列化后的内容是清晰易读的
- 跨平台,json格式几乎所有的语言都可以正确解析
缺点:
- 序列化效率不高,java中的json序列化大多是基于反射来实现的
- 序列化后对象体积变大
下面来介绍一下jackson的使用:
首先,添加依赖包:
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.13.2.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
<version>2.13.2</version>
</dependency>
接着代码示例如下:
// 1. 获取 ObjectMapper
ObjectMapper objectMapper = new ObjectMapper();
UserVO user = new UserVO(1, 20, "zhuff");
// 写对象
String jsonVal = objectMapper.writeValueAsString(user);
System.out.println(jsonVal);
ByteBuf msg = Unpooled.copiedBuffer(jsonVal, StandardCharsets.UTF_8);
// 读对象
InputStream byteBufIn = new ByteBufInputStream(msg);
UserVO user1 = objectMapper.readValue(byteBufIn, UserVO.class);
需要注意的是,UserVO类必须包含不含参数的构造方法。
7.4.3 Protocol Buffers 序列化
由Google提出的一种序列化方案,Protocol Buffers以一种紧凑而高效的方式对结构化数据进行编码及解码。它与许多编程语言绑定,因此,非常适合跨语言的通信系统。
优点:
- 序列化性能好
- 序列化后体积小,传输效率高
- 跨语言,提供多种语言支持
缺点:
- 有上手门槛,需要按照ProtocolBuffers的约定来编写message,然后生成Java类
下面是Netty提供的支持Protocol Buffers的Handler:
ProtobufEncoder
:Protocol Buffers的编码器ProtobufDecoder
:Protocol Buffers的解码器,构造器中需要提供需要序列化类的默认实例ProtobufVarint32FrameDecoder
:该类对象一般添加在ProtobufDecoder
的前面,它的作用是根据消息中的Protocol Buffers的"Base 128 Varints"整型长度字段值动态分隔ByteBufProtobufVarint32LengthFieldPrepender
:该类一般添加在ProtobufEncoder
前面,它需要和ProtobufVarint32FrameDecoder
相互配合,作用是在对象序列化后的ByteBuf前面添加"Base 128 Varints"整型长度字段值。
一般情况下,四个Handler的添加顺序如下:
ch.pipeline()
.addLast(new CombinedChannelDuplexHandler<>(
new ProtobufVarint32FrameDecoder(),
new ProtobufVarint32LengthFieldPrepender()))
.addLast(new CombinedChannelDuplexHandler<>(
new ProtobufDecoder(UserDTO.User.getDefaultInstance()),
new ProtobufEncoder()));
使用步骤:
- 在项目中添加如下依赖包:
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.22.2</version>
</dependency>
-
在Protobuf官方Github上下载最新的protobuf生成器
-
编写User.proto文件(如何编写可以查看文档)
syntax = "proto3";
option optimize_for = SPEED;
option java_outer_classname = "UserDTO";
message User {
int32 id = 1;
int32 age = 2;
string name = 3;
string ab = 4;
}
- 使用 protoc 工具将 .proto文件转换为 .java 文件,命令如下:
protoc --proto_path=com/fy/test/proto --java_out=/com/fy/test/proto User.proto
-
将 .java 文件复制到我们的工作目录。
-
添加 Protobuf 的编解码器。
ch.pipeline()
.addLast(new CombinedChannelDuplexHandler<>(
new ProtobufVarint32FrameDecoder(),
new ProtobufVarint32LengthFieldPrepender()))
.addLast(new CombinedChannelDuplexHandler<>(
new ProtobufDecoder(UserDTO.User.getDefaultInstance()),
new ProtobufEncoder()))
.addLast(new EchoObjHandler());
经过ProtobufDecoder
解码器后,ByteBuf对象就被解码为UserDTO.User对象,也就是说EchoObjHandler接收到的对象类型为UserDTO.User。
整个流程如上所述,以上面的 .proto 文件为例,会生成 UserDTO.java 文件,UserDTO类中包含了 User 内部类,该内部类就是我们使用protobuf要序列化的类。
下面展示一种使用Protobuf传输多个类型的对象方法
- 定义 Role.proto 文件
syntax="proto3";
option java_outer_classname="RoleDTO";
option java_package="com.fy.test.proto";
option optimize_for=SPEED;
message Role {
RoleType type = 1;
oneof roles{
Student students = 2;
Teacher teacher = 3;
}
}
enum RoleType {
STUDENT = 0;
TEACHER = 1;
}
message Student {
string numberNo = 1;
string name = 2;
int32 age = 3;
string address = 4;
}
message Teacher {
string name = 1;
int32 age = 2;
string address = 3;
int32 courseNo = 4;
}
该Role.proto文件里面包含了三个message,Role message中使用了oneof语法,表示Student和Teacher只会有一个有值,这在序列化过程中会节省空间。
-
运行protoc命令生成UserDTO.java文件,将该文件复制到工作目录。
-
添加Protobuf编解码器
ch.pipeline()
.addLast(new CombinedChannelDuplexHandler<>(
new ProtobufVarint32FrameDecoder(),
new ProtobufVarint32LengthFieldPrepender()))
.addLast(new CombinedChannelDuplexHandler<>(
new ProtobufDecoder(RoleDTO.Role.getDefaultInstance()),
new ProtobufEncoder()))
.addLast(new EchoObjHandler());
- 处理器中需要对类型进行判断,代码如下:
if (msg instanceof RoleDTO.Role) {
RoleDTO.Role role = (RoleDTO.Role) msg;
switch (role.getType()) {
case STUDENT: {
RoleDTO.Student student = role.getStudents();
System.out.println(student);
}break;
case TEACHER: {
RoleDTO.Teacher teacher = role.getTeacher();
System.out.println(teacher);
}break;
default: {
// nothing
}
}
}