自定义协议之TCP/IP分包问题解决
这是我自己第一个完完全全都是我负责的模块,之前的项目虽然是属于我负责,但是代码一开始并不是我写的,我只是在原来的代码上进行了修修改改,这次记录的也是网络编程中一个最常见的问题–TCP/IP分包问题。
项目背景
我们PIS(乘客信息系统)系统需要去对接ATS(列车自动监控系统)系统,然后就必不可少的需要通信协议去支持,对方是属于服务端,我这是属于客户端,需要去接收对方发送过来的数据,并解析,我这在将解析出来的数据转发给我们的设备显示列车到站信息。
技术实现
介绍下使用了什么技术和解决分包的一个思路。
网络框架搭建—Netty
选用了Netty的框架,去实现网络通信。netty的详细介绍也会在后面有空的时候进行整理出来。
然后简单说下对方系统给我们提供出来的协议吧。首先他会将所有的数据封装到一个大包中去,这么包中有20-30多帧消息,每一帧消息不超过1024个字节,这一帧消息中可能有多个站台的消息,每个站台消息都会有一个消息体去单独的记录,每一帧消息会有消息头消息尾,记录当前是多少帧。
下面这是一张简单的图,帧头帧尾,还有站台消息体就没有详细去记录了。
数据包已经大到这个程度了,很明显就需要我们去解决TCP分包的问题了。
TCP分包代码解决
首先,我们要明确到协议的内容,我们去解析消息,就得完全按照协议来。使用netty框架,在解码中最重要的一点就是要合理的使用index,否则数据解析起来就会很困难的。
代码实现:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| package com.yuyi.pis.ats.modbus.codec;
import com.yuyi.pis.ats.cache.ProjectCache; import com.yuyi.pis.ats.service.AtsBroadcastService; import com.yuyi.pis.common.utils.ByteUtils; import com.yuyi.pis.signal.common.enums.ProjectName; import io.netty.buffer.ByteBuf; import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import lombok.extern.slf4j.Slf4j;
import java.util.List;
@Slf4j public class FixLengthDecoder extends ByteToMessageDecoder {
private static final int HEAD_LENGTH = 6; AtsBroadcastService atsBroadcastService = new AtsBroadcastService() ;
ByteBuf cacheBuf = Unpooled.buffer();
@Override protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf buffer, List<Object> out) throws Exception { if (ProjectName.LINE12.getValue().equals(ProjectCache.project.getProjectName())) { atsBroadcastService.sendMessageToPlatDoor(buffer); log.info("给站台门数据发送成功,buffer的长度是:{}",buffer.readableBytes()); } if (buffer.readableBytes() < HEAD_LENGTH) { log.debug("当前收到字节长度不够,为:[{}]", buffer.readableBytes()); return; } int beginIndex = buffer.readerIndex(); buffer.readShort();
int frameCount=buffer.readByte(); int frameIndex=buffer.readByte();
byte[] dataLen=new byte[2]; buffer.readBytes(dataLen); int dataLength = ByteUtils.byte2ToShort(dataLen);
if (buffer.readableBytes() < dataLength) { buffer.readerIndex(beginIndex); return; } ByteBuf otherByteBuf = buffer.slice(buffer.readerIndex(), dataLength); cacheBuf.writeBytes(otherByteBuf);
buffer.readerIndex(beginIndex + 8 + dataLength);
if (frameIndex < frameCount) { return; } if (frameIndex == frameCount) { cacheBuf.retain(); out.add(cacheBuf); } } }
|
在代码实现的时候,我的原则就是对方怎么进行封装数据,我这边就怎么去解析数据。
总结
这个问题一开始是为难了我很久的,尤其是使用这个指针的读取问题,因为他们消息帧里面又会有多条站台消息,就又得去把这些消息给剔除掉计算正确的下标来。这篇文章主要是记录自己的第一次解决tcp分包的一个问题。