Netty快速入门
关于工作上使用Netty的介绍
学习Netty之前你一定要掌握的知识点
Reactor的线程模型介绍
Reactor线程模型是在传统的I/O线程模型上进行的一种优化,可以分为三种线程模型。传统的I/O线程模型有以下两个缺点:
- 每个连接都需要创建一个对应线程,线程大量创建占用大量的服务器资源
- 线程没有数据可读情况下的阻塞会对性能造成很大的影响
Reactor线程模型为了解决这两个问题,提供了以下解决方案: - 基于I/O多路复用:多个客户端连接共用一个阻塞对象,应用程序只需要在一个阻塞对象等待,无需阻塞等待所有连接。当某个连接有新的数据可以处理时,通过事件驱动通知应用程序,线程从阻塞状态返回,开始进行业务处理
- 基于线程池技术减少线程创建:基于线程池,不必再为每一个连接创建线程,将连接完成后的业务处理分配给线程池进行调度
Reactor线程模型图:
Reactor在一个单独的线程中进行,负责监听和分发事件。
Reactor的两个核心组件:
EventDispatch(在netty中相当于就是EventLoopGroup):监听和分发事件,分发给适当的处理程序来对IO事件做出反应。
handlers(在netty中就是相当于各个处理器也就是自己需要去实现的handler类)是处理程序执行IO事件要完成的实际事件,Reactor 通过调度适当地处理程序来响应I/O事件,处理程序执行非阻塞操作。
Reactor模式使用I/O复用监听事件,收到事件后,分发给某个线程去处理,这也是能进行网络高并发处理的关键。
比较常见的Reactor线程模型有三种:单线程Reactor模型、单线程多Reactor模型、多线程多Reactor模型。
- 单线程Reactor模型
单Reactor单线程模型是指用一个线程通过多路复用来完成所有的I/O操作(accept、read、write等)。
通过Acceptor类接收客户端的TCP连接请求消息,当链路建立成功后,通过Dispatch将对应的ByteBuffer派发到指定的Handler上,进行消息解码。用户线程消息编码后通过NIO线程将消息发送给客户端。
单Reactor单线程模型的优点在于模型简单,没有多线程、进程间通信、竞争的问题,全部都在一个线程中完成。
缺点是:
性能问题:只有一个线程去处理任务,在高并发情况下很容易阻塞
可靠性问题:一旦线程意外跑飞,或者进入死循环,会导致整个系统通信模块不可用,不能接收和处理外部消息,造成节点故障
额外需要介绍下容易搞混的Acceptor和Reactor:
在Netty中,Reactor和Acceptor是两个概念,它们分别代表了不同的角色和功能。
- Reactor(反应器):
- 在Netty中,Reactor是指事件驱动的反应器模式,用于处理网络事件。它是一种设计模式,用于处理并发请求和事件的分发。在Netty中,Reactor模式通过一个或多个事件轮询器(Event Loop)来监听和分发网络事件。事件轮询器负责监听注册的Channel上的各种事件,例如连接建立、数据可读、数据可写等。当有事件发生时,事件轮询器会将事件派发给相应的处理器进行处理。
- Reactor模式在Netty中扮演了事件分发和调度的角色,它负责监听和分发各种网络事件,确保事件能够被适当地处理。
- Acceptor(接收器):
- Acceptor是指用于接收客户端连接的线程池或线程。在Netty中,Acceptor线程池(也称为Boss线程池)负责接收客户端的连接请求,并将连接注册到事件轮询器(Event Loop)中。当有新的连接请求到达时,Acceptor线程负责接收连接,并将连接注册到事件轮询器中,使得事件轮询器能够监听和处理该连接上的事件。
- Acceptor的主要作用是接收客户端的连接请求,并进行连接的接受和注册,以便后续的事件能够被事件轮询器处理。
在Netty中,Reactor模式和Acceptor线程池是相互配合的。Acceptor线程池负责接收客户端连接请求,并将连接注册到事件轮询器中。而事件轮询器(Reactor)负责监听和分发连接上的事件,包括数据的读写、连接的关闭等。通过这种配合,Netty能够实现高性能、高并发的网络通信,使得事件能够被适当地分发和处理。
建立连接的过程是先经过Acceptor线程池接收连接请求,然后将连接注册到事件轮询器(Reactor)中,最终由Reactor监听和处理连接上的事件。这样的设计可以将连接接收和事件处理分离,提高系统的并发处理能力和性能。
还有一点就是当建立了连接之后服务端和客户端之间就建立了一个可进行数据交互的通道。在这个通道上,客户端可以发送数据请求给服务端,服务端可以接收并处理这些数据请求。也就是创建了channel对象,双方可以直接通过这个通道进行数据传输了。
- 单Reactor多线程模型
该模型和单Reactor单线程模型的最主要区别在于,Reactor主线程只负责监听、接收客户端请求以及派发任务,而比较耗时的I/O操作由另一个worker线程池来进行分配线程去处理,也就是业务处理的事情交给了worker线程池来处理了- 服务器仅有一个Reactor线程;
- IO读与发送(IO写)由 Reactor线程负责执行;
- 解码,计算,编码等业务逻辑处理由 工作线程池(WorkThreads)负责分配线程执行;
- 优缺点
- 优点:解码,计算,编码等耗时操作若有阻塞,不会导致服务器的其他请求阻塞;
- 缺点:仅有一个 Reactor线程来处理客户端连接,若客户端IO读写数据量大,容易在IO读写时发生阻塞;
- 优缺点
- 主从Reactor多线程模型
主从Reactor多线程模型概述:由于单Reactor线程会降低多核cpu能力(或未能发挥),所以需要建立多Reactor线程,即主从Reactor线程模型。
主Reactor线程(单个):用于建立请求的连接(包括认证与授权);
从Reactor线程(多个):用于处理IO读写,但是下图并未画出;
补充:其他业务逻辑如解码,计算,编码等工作还是交由异步线程池(Worker线程池)处理
第1类:主Reactor线程负责客户端连接的建立(同步);
第2类:子Reactor线程有多个,封装在线程池,异步处理客户端的IO读写(read 与 send);
第3类:工作线程池:异步处理业务逻辑,包括 编码,计算,解码等操作;- 优缺点
- 优点1:显然,IO读写若有阻塞,不会导致其他客户端请求阻塞;
- 优点2:显然,业务逻辑若有阻塞(耗时操作),也不会导致其他客户端请求阻塞;
- 小结:在主从Reactor多线程模型条,服务器的并发性非常高(也充分利用了多核cpu的算力)
线程模型讲解完毕,下面进入Netty的学习
- 优缺点
基本概念:
1 | 1. 介绍Netty是什么,它的作用和用途。 |
核心组件简介及其netty流程:
1 | 1) netty线程模型包含ServerBootStrap,NioEventLoopGroup 及其组成部分 NioEventLoop,NioChannel, ChannelPipeline, ChannelHandler 等 |
- 客户端的代码demo:
1 | package com.jw.cloud.canal.netty; |
组件详细介绍
- channel对象: 表示一个连接通道,代表着每一个连接对象所存储的数据信息,可以直接与客户端/服务端进行通信。
- 一旦有客户端成功与服务端建立连接,将新建一个Channel与该客户端进行绑定
- Channel从线程组NioEventloopGroup中获取一个NioEventloop,并注册到该NioEventloop,后续该Channel的生命周期内都与该NioEventloop绑定在一起
- Channel同客户端进行网络连接、关闭和读写,生成对应的even事件,由Selector轮询到后,交给Worker线程组中的调度线程去执行
Netty是一个基于Java NIO的网络应用框架,提供了一系列的高性能、可扩展的网络组件,用于开发可靠、高效的网络应用程序。下面是Netty中一些核心组件的详细介绍:
- EventLoop(事件循环):EventLoop是Netty中的核心组件之一,负责处理IO事件的发生和分发。每个EventLoop都绑定一个线程,用于处理IO事件和执行任务。EventLoop通过Selector来监听注册在其上的Channel上的IO事件。
- ChannelHandler(通道处理器):ChannelHandler是用于处理IO事件的逻辑组件,它负责对数据的读取、写入和处理。可以通过添加不同的ChannelHandler来实现不同的业务逻辑,例如解码、编码、数据处理等。
- ChannelPipeline(通道管道):ChannelPipeline是一系列的ChannelHandler组成的处理链,用于处理Channel上的事件。每个Channel都有一个关联的ChannelPipeline,当事件发生时,Channel会将事件传递给Pipeline,Pipeline按照顺序将事件传递给其中的ChannelHandler进行处理。
- Bootstrap(引导类):Bootstrap是Netty的启动类,用于配置和启动Netty的客户端程序。通过Bootstrap,可以设置一些网络相关的参数,如连接地址、IO模型、Channel处理器等。
- ServerBootstrap(服务器引导类):ServerBootstrap是Netty的服务器引导类,用于配置和启动Netty的服务器程序。它继承自Bootstrap,并额外提供了一些服务器特定的配置选项,如端口、线程模型、Channel处理器等。
- ChannelFuture(通道未来):ChannelFuture表示一个异步的IO操作的结果或状态。通过ChannelFuture可以获取操作的结果、注册监听器来处理操作完成事件,或者进行操作的取消等操作。
协议和编解码器:
tips:可以采用二进制的编解码器去提高数据传输效率
netty内部支持了很多内置协议http、udp、websocket等,对应的也包含了很多的编解码器ByteToMessageDecoder、MessageToByteEncoder、LengthFieldBasedFrameDecoder(消息定长解码器)等、StringDecoder和StringEncoder(字符串的编解码器),
我在工作上一般都是自定义的协议,采用的也都是二进制的数据传输,基本就是一个字节或者一个寄存器去进行数据解析。
下面是我工作过程中用到的编解码器:
- 半包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85package com.yuyi.pis.ats.modbus.codec;
import com.yuyi.pis.ats.cache.ProjectCache;
import com.yuyi.pis.ats.service.AtsBroadcastService;
import com.yuyi.pis.common.utils.ByteUtils;
import com.yuyi.pis.signal.common.enums.ProjectName;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* 不同业务不同处理
* 必须知道的:reteun 可以让后面到缓冲区的数据与之前到缓冲区的数据成为一个完整的数据 所以才能得到一条完整的消息
* 按帧合包 处理拆包
* 背景:对方服务端会给我发一个很大的包,经过传输层会将大包拆分成多个小包给我分别发送消息,这边就需要进行将小包进行合并
* 思路:1、首先满足第一条,消息不能低于6个字节,否则就进行重新接收消息 return的作用:在tcp中不是进行分包了嘛,通过return可以将这条消息返回到缓冲区去,然后等待tcp分包的下一条消息到,进行组包处理
* 2、记录当前的索引值,字节也许已经被读取了,所以必须记录他当前读取到哪一个字节下标了
* 3.就是实际的一些业务处理了 比如会有多少帧消息呀,当前第多少帧呀,这一帧的数据长度多少呀,如果这一帧收到的数据长度与解析出来的不一样就整帧数据返回,下标重新回归一开始记录的下标(这个很重要的),这样才能接着去完整的解析数据
* 4.当解析完一整帧数据的时候,因为下一帧数据格式也是一模一样的 所以这一帧的数据我们就可以暂时缓存起来,但是要根据实际情况将下标计算好
* 5. 数据转发给下一个处理器,完成解码
* @author joywu
* @since 2021/08/11
*/
public class FixLengthDecoder extends ByteToMessageDecoder {
/**
* 报文协议头 6个字节
*/
private static final int HEAD_LENGTH = 6;
AtsBroadcastService atsBroadcastService = new AtsBroadcastService() ;
/**
* 缓存消息体
*/
ByteBuf cacheBuf = Unpooled.buffer();
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buffer, List<Object> out) throws Exception {
// 报文不完整 继续接收
if (buffer.readableBytes() < HEAD_LENGTH) {
log.debug("当前收到字节长度不够,为:[{}]", buffer.readableBytes());
return;
}
// 记录开始的读索引0
int beginIndex = buffer.readerIndex();
//读取协议头
buffer.readShort();
//总帧数
int frameCount=buffer.readByte();
//当前帧
int frameIndex=buffer.readByte();
//data数据长度
byte[] dataLen=new byte[2];
buffer.readBytes(dataLen);
int dataLength = ByteUtils.byte2ToShort(dataLen);
// 收到字节长度不满一帧 继续接收
if (buffer.readableBytes() < dataLength) {
buffer.readerIndex(beginIndex);
return;
}
ByteBuf otherByteBuf = buffer.slice(buffer.readerIndex(), dataLength);
cacheBuf.writeBytes(otherByteBuf);
// 读指针设置为当前帧帧尾
buffer.readerIndex(beginIndex + 8 + dataLength);
// 复制消息体到out,不包含消息头
// 收到一帧后 储存消息体并继续接收
if (frameIndex < frameCount) {
return;
}
if (frameIndex == frameCount) {
cacheBuf.retain();
out.add(cacheBuf);
}
}
} - 粘包
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68package com.yuyi.pis.vehicle.netty.code;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;
import java.util.List;
/**
* 这里就是一个自定义的粘包解码器
* @description: 粘包解码器
* @author joywu
* @date 2021/8/07/9:38
*/
public class VehicleDecoder1 extends ByteToMessageDecoder {
/**
* 报文协议头10个字节
*/
private static final int HEAD_LENGTH = 5;
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> out) throws Exception {
decode(byteBuf,out);
}
/**
* 需要递归处理的业务数据
* @param byteBuf
* @param out
*/
private void decode(ByteBuf byteBuf,List<Object> out) {
// 报文不完整 继续接收
if (byteBuf.readableBytes() < HEAD_LENGTH) {
log.info("当前收到字节长度不够,为:[{}]", byteBuf.readableBytes());
return;
}
int beginIndex = byteBuf.readerIndex();
// 0x7e
byte b = byteBuf.readByte();
// 消息id
byte messageId = byteBuf.readByte();
// 数据长度
short dateLength = byteBuf.readShort();
// 还有一个 7f 结尾的
if(dateLength+1<byteBuf.readableBytes()){
// 出现了粘包或者数据刚好接收完全
if(messageId==1){
// todo处理业务
}if(messageId == 2){
// todo处理业务
}
// 转发给handler处理 具体传什么消息 由业务确定
out.add(messageId);
// 第一个字节 消息id 和最后一个字节 在加上消息长度的两个字节 在加上消息体长度
if(byteBuf.readableBytes()==0){
// 表示消息已经处理完毕了
return;
}
byteBuf.readerIndex(beginIndex+dateLength+2+2);
decode(byteBuf,out);
}else {
// 出现了半包问题
int beginIndex1 = byteBuf.readerIndex();
log.info("数据接收不完全,正在重新接收数据");
return;
}
}
}
异步通信和回调机制:
- 讨论Netty中的异步通信模型和回调机制。
1
2
3
4
5
6
7
8
9Netty是基于事件驱动的异步网络编程框架,它提供了强大的异步通信模型和回调机制,使得开发者可以编写高效、可扩展的网络应用程序。
1. 异步通信模型:
Netty采用了基于事件驱动的异步通信模型。它的核心是EventLoop(事件循环),通过一个或多个线程来处理I/O事件和任务,而不需要为每个连接创建一个线程。这种异步模型可以实现高并发和高性能的网络通信。
在Netty中,所有的I/O操作都是非阻塞的,通过Selector(选择器)监听多个Channel上的事件,并将事件分发给相应的EventLoop进行处理。当有新的连接接入、数据可读、写操作完成等事件发生时,Netty会触发相应的回调方法,进行异步处理。
2. 回调机制:
Netty使用回调机制来处理事件。当某个事件发生时,Netty会调用事先注册的回调方法(也称为处理器或处理器链)来处理事件。
在Netty中,可以通过ChannelPipeline构建一个处理器链,将多个处理器按顺序组织起来。每个处理器都负责处理特定的事件或执行特定的任务。当一个事件发生时,Netty会自动调用处理器链中相应的处理器,进行事件处理或任务执行。通过回调方法,开发者可以在合适的时机对事件进行处理,例如读取数据、写入数据、连接管理等。
Netty还提供了一些内置的回调接口和适配器类,使得开发者可以方便地实现自定义的回调逻辑。
通过异步通信模型和回调机制,Netty可以实现高效的并发处理和响应,同时可以充分利用系统资源,提高应用程序的性能和可伸缩性。开发者可以编写简洁、可读性强的代码,实现高性能的网络应用程序。 - 介绍ChannelFuture、Promise和回调处理的使用方法。
在Netty中,ChannelFuture和Promise是用于处理异步操作结果和进行回调处理的重要组件。
ChannelFuture:
ChannelFuture表示一个异步的I/O操作的结果。当进行网络操作(如写入数据、连接、绑定等)时,通常会返回一个ChannelFuture对象,用于表示操作的状态和结果。通过ChannelFuture,可以异步地获取操作的结果或进行后续的处理。常用的ChannelFuture方法包括:
- isDone(): 判断操作是否已完成。
- isSuccess(): 判断操作是否成功。
- addListener(): 添加监听器,用于在操作完成时执行回调逻辑。
- sync(): 阻塞等待操作完成。
- get(): 获取操作的结果。
通过ChannelFuture,可以进行操作结果的判断、等待操作完成和获取操作结果等操作。它提供了与异步操作相关的方法,方便进行后续处理。
Promise:
Promise是ChannelFuture的一个子接口,它表示一个异步操作的结果的可写部分。Promise可以被用于显式地设置操作的结果,并提供更多的灵活性和控制权。Promise继承了ChannelFuture的方法,同时添加了设置操作结果的方法,如setSuccess()、setFailure()等。通过Promise,可以主动设置操作的结果,并通知等待该结果的监听器。
回调处理:
在Netty中,通过添加监听器(Listener)到ChannelFuture或Promise上,可以实现回调处理。监听器是一个回调接口,定义了在操作完成时执行的回调逻辑。
通过addListener()方法,可以添加一个监听器到ChannelFuture或Promise上,监听器的回调方法会在操作完成时被触发。在回调方法中,可以根据操作的结果执行相应的处理逻辑。
示例代码:1
2
3
4
5
6
7
8ChannelFuture future = channel.writeAndFlush(data);
future.addListener((ChannelFutureListener) future -> {
if (future.isSuccess()) {
// 操作成功处理逻辑
} else {
// 操作失败处理逻辑
}
});在上述示例中,通过addListener()方法添加了一个监听器,当操作完成时,将会触发回调方法。在回调方法中,根据操作的结果执行相应的处理逻辑。
通过使用ChannelFuture、Promise和回调处理,可以实现异步操作的管理和结果处理。这样的设计模式可以提高应用程序的性能和可伸缩性,并使代码更加简洁和可读。
Netty的应用实例:
这个应用实例的话比较简单,网上有很多的实例,建议直接去网上搜索,其实主要的就是消息编解码喝协议的确认。
性能调优和最佳实践:
- 使用适当的I/O线程池大小:根据系统的负载和处理需求,配置合适的I/O线程池大小,以避免过多或过少的线程导致性能问题。
- 优化TCP参数:根据网络环境和应用需求,调整TCP相关参数,如Nagle算法、TCP窗口大小、TCP连接超时等,以降低延迟并提高吞吐量。
- 使用高效的编解码器:选择高效的编解码器,如基于二进制的编解码器(如Protobuf、MessagePack)或基于文本的编解码器(如JSON、XML),以提高数据传输的效率。
- 合理使用缓冲区:使用合适大小的缓冲区,避免过大的缓冲区导致内存占用过高,同时保证缓冲区足够大以避免频繁的内存分配和复制。
- 优化并发处理:合理利用并发编程技术,如使用线程池、同步和异步操作等,以提高并发处理能力和减少线程阻塞时间。
- 使用内存池:Netty提供了ByteBuf内存池,通过重用和池化内存,减少内存分配和释放的开销,提高性能。
总结:
- 对整篇博客进行总结和回顾。
上面基本讲述了我在netty方面的一些认知和一些实际应用场景,其实需要学习的东西还是有很多的,比如具体的一些ByteBufAPI、断开重连机制、心跳机制,这些东西我虽然都在
业务层面中应用到了,但这篇文章我还没来的及整理,除了这些业务必须掌握的,你还得知道一些网络里面的常用知识点,比如大小端、各种协议、缓冲区大小等。 - 强调Netty的优势和适用场景。
Netty是一款高性能、可扩展的网络编程框架,具有以下优势和适用场景:
- 高性能:Netty采用了异步、事件驱动的设计模式,能够处理大量并发连接和高负载的网络通信。它使用了高效的I/O模型和线程池,通过复用线程和事件回调机制,减少了线程切换的开销,提高了系统的吞吐量和响应性能。
- 可扩展性:Netty提供了灵活的组件和扩展点,使开发者能够自定义和扩展协议、编解码器、处理器等,以适应各种不同的应用需求。它的组件化设计和优秀的扩展性使得应用程序能够方便地进行功能增强和定制化开发。
- 安全性:Netty提供了各种安全机制和功能,如SSL/TLS支持、加密和解密、身份认证等,帮助开发者构建安全可靠的网络应用程序。
- 跨平台:Netty基于Java NIO技术,可以在各种平台上运行,包括Windows、Linux、macOS等。它提供了对TCP、UDP、HTTP、WebSocket等协议的支持,可以构建各种类型的网络应用程序。
- 高度可定制:Netty提供了丰富的API和灵活的配置选项,可以根据具体需求进行定制和配置。开发者可以根据应用程序的特点和性能要求,调整各种参数和策略,以获得最佳的性能和资源利用率。
适用场景:
- 高性能服务器:Netty适用于构建高性能的服务器,如游戏服务器、实时通信服务器、金融交易系统等,能够处理大量并发连接和高吞吐量的数据传输。
- 分布式系统:Netty在分布式系统中具有优异的表现,可以用于构建分布式消息传递、RPC框架、集群通信等,提供高效的跨节点通信能力。
- IoT(物联网)应用:Netty对于IoT应用也非常适用,能够处理设备间的通信和数据交互,支持各种物联网协议和传输方式。
- 高性能代理和中间件:Netty可用于构建代理服务器、负载均衡器、缓存服务器等中间件,能够处理大量的连接和请求,并提供高性能和可靠性。
总的来说,Netty是一款强大的网络编程框架,适用于构建高性能、可扩展、安全可靠的网络应用程序,