自定义协议之TCP/IP分包问题解决

这是我自己第一个完完全全都是我负责的模块,之前的项目虽然是属于我负责,但是代码一开始并不是我写的,我只是在原来的代码上进行了修修改改,这次记录的也是网络编程中一个最常见的问题–TCP/IP分包问题。

项目背景

我们PIS(乘客信息系统)系统需要去对接ATS(列车自动监控系统)系统,然后就必不可少的需要通信协议去支持,对方是属于服务端,我这是属于客户端,需要去接收对方发送过来的数据,并解析,我这在将解析出来的数据转发给我们的设备显示列车到站信息。

技术实现

介绍下使用了什么技术和解决分包的一个思路。

网络框架搭建—Netty

选用了Netty的框架,去实现网络通信。netty的详细介绍也会在后面有空的时候进行整理出来。
然后简单说下对方系统给我们提供出来的协议吧。首先他会将所有的数据封装到一个大包中去,这么包中有20-30多帧消息,每一帧消息不超过1024个字节,这一帧消息中可能有多个站台的消息,每个站台消息都会有一个消息体去单独的记录,每一帧消息会有消息头消息尾,记录当前是多少帧。
下面这是一张简单的图,帧头帧尾,还有站台消息体就没有详细去记录了。
img_1.png

数据包已经大到这个程度了,很明显就需要我们去解决TCP分包的问题了。

TCP分包代码解决

首先,我们要明确到协议的内容,我们去解析消息,就得完全按照协议来。使用netty框架,在解码中最重要的一点就是要合理的使用index,否则数据解析起来就会很困难的。
代码实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.yuyi.pis.ats.modbus.codec;

import com.yuyi.pis.ats.cache.ProjectCache;
import com.yuyi.pis.ats.service.AtsBroadcastService;
import com.yuyi.pis.common.utils.ByteUtils;
import com.yuyi.pis.signal.common.enums.ProjectName;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import io.netty.handler.codec.ByteToMessageDecoder;
import lombok.extern.slf4j.Slf4j;

import java.util.List;


/**
* 不同业务不同处理
* 必须知道的:reteun 可以让后面到缓冲区的数据与之前到缓冲区的数据成为一个完整的数据 所以才能得到一条完整的消息
* 按帧合包 处理拆包
* 背景:对方服务端会给我发一个很大的包,经过传输层会将大包拆分成多个小包给我分别发送消息,这边就需要进行将小包进行合并
* 思路:1、首先满足第一条,消息不能低于6个字节,否则就进行重新接收消息 return的作用:在tcp中不是进行分包了嘛,通过return可以将这条消息返回到缓冲区去,然后等待tcp分包的下一条消息到,进行组包处理
* 2、记录当前的索引值,字节也许已经被读取了,所以必须记录他当前读取到哪一个字节下标了
* 3.就是实际的一些业务处理了 比如会有多少帧消息呀,当前第多少帧呀,这一帧的数据长度多少呀,如果这一帧收到的数据长度与解析出来的不一样就整帧数据返回,下标重新回归一开始记录的下标(这个很重要的),这样才能接着去完整的解析数据
* 4.当解析完一整帧数据的时候,因为下一帧数据格式也是一模一样的 所以这一帧的数据我们就可以暂时缓存起来,但是要根据实际情况将下标计算好
* 5. 数据转发给下一个处理器,完成解码
* @author joywu
* @since 2021/08/11
*/
@Slf4j
public class FixLengthDecoder extends ByteToMessageDecoder {

/**
* 报文协议头 6个字节
*/
private static final int HEAD_LENGTH = 6;
AtsBroadcastService atsBroadcastService = new AtsBroadcastService() ;
/**
* 缓存消息体
*/
ByteBuf cacheBuf = Unpooled.buffer();

@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buffer, List<Object> out) throws Exception {
if (ProjectName.LINE12.getValue().equals(ProjectCache.project.getProjectName())) {
atsBroadcastService.sendMessageToPlatDoor(buffer);
log.info("给站台门数据发送成功,buffer的长度是:{}",buffer.readableBytes());
}
// 报文不完整 继续接收
if (buffer.readableBytes() < HEAD_LENGTH) {
log.debug("当前收到字节长度不够,为:[{}]", buffer.readableBytes());
return;
}
// 记录开始的读索引0
int beginIndex = buffer.readerIndex();
//读取协议头
buffer.readShort();

//总帧数
int frameCount=buffer.readByte();
//当前帧
int frameIndex=buffer.readByte();

//data数据长度
byte[] dataLen=new byte[2];
buffer.readBytes(dataLen);
int dataLength = ByteUtils.byte2ToShort(dataLen);

// 收到字节长度不满一帧 继续接收
if (buffer.readableBytes() < dataLength) {
buffer.readerIndex(beginIndex);
return;
}
ByteBuf otherByteBuf = buffer.slice(buffer.readerIndex(), dataLength);
cacheBuf.writeBytes(otherByteBuf);

// 读指针设置为当前帧帧尾
buffer.readerIndex(beginIndex + 8 + dataLength);
// 复制消息体到out,不包含消息头

// 收到一帧后 储存消息体并继续接收
if (frameIndex < frameCount) {
return;
}
if (frameIndex == frameCount) {
cacheBuf.retain();
out.add(cacheBuf);
}
}
}

在代码实现的时候,我的原则就是对方怎么进行封装数据,我这边就怎么去解析数据。

总结

这个问题一开始是为难了我很久的,尤其是使用这个指针的读取问题,因为他们消息帧里面又会有多条站台消息,就又得去把这些消息给剔除掉计算正确的下标来。这篇文章主要是记录自己的第一次解决tcp分包的一个问题。