소스 검색

2023-12-27 修改netty 报文丢失问题

donggaosheng 2 년 전
부모
커밋
75fc3a75b8
1개의 변경된 파일44개의 추가작업 그리고 4개의 파일을 삭제
  1. 44 4
      zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/netty/MessageCodec.java

+ 44 - 4
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/netty/MessageCodec.java

@@ -10,12 +10,52 @@ import lombok.extern.slf4j.Slf4j;
 import java.util.List;
 
 @Slf4j
-public class MessageCodec extends MessageToMessageCodec<ByteBuf, String> {
+public class MessageCodec extends MessageToMessageCodec<ByteBuf, Object> {
+
+    ByteBuf tempMsg = Unpooled.buffer();
+
+    static final int PACKET_SIZE = 160;
 
     @Override
-    protected void encode(ChannelHandlerContext channelHandlerContext, String msg, List<Object> list) throws Exception {
-        log.info("netty消息正在编码 " + msg);
-        list.add(Unpooled.copiedBuffer(ReUtil.hexStringToByteArray(msg)));
+    protected void encode(ChannelHandlerContext channelHandlerContext, Object msg, List<Object> list) throws Exception {
+        ByteBuf in=(ByteBuf)msg;
+        // 1、 合并报文
+        ByteBuf message = null;
+        int tmpMsgSize = tempMsg.readableBytes();
+        // 如果暂存有上一次余下的请求报文,则合并
+        if (tmpMsgSize > 0) {
+            message = Unpooled.buffer();
+            message.writeBytes(tempMsg);
+            message.writeBytes(in);
+            System.out.println("合并:上一数据包余下的长度为:" + tmpMsgSize + ",合并后长度为:" + message.readableBytes());
+        } else {
+            message = in;
+        }
+        // 2、 拆分报文
+        // 这个场景下,一个请求固定长度为3,可以根据长度来拆分
+        // i+1 i+1 i+1 i+1 i+1
+        // 不固定长度,需要应用层协议来约定 如何计算长度
+        // 在应用层中,根据单个报文的长度及特殊标记,来将报文进行拆分或合并
+        // dubbo rpc协议 = header(16) + body(不固定)
+        // header最后四个字节来标识body
+        // 长度 = 16 + body长度
+        // 0xda, 0xbb 魔数
+        int size = message.readableBytes();
+        int counter = size / PACKET_SIZE;
+        for (int i = 0; i < counter; i++) {
+            byte[] request = new byte[PACKET_SIZE];
+            // 每次从总的消息中读取3个字节的数据
+            message.readBytes(request);
+            // 将拆分后的结果放入out列表中,交由后面的业务逻辑去处理
+            list.add(Unpooled.copiedBuffer(request));
+        }
+
+        size = message.readableBytes();
+        if (size != 0) {
+            // 剩下来的数据放到tempMsg暂存
+            tempMsg.clear();
+            tempMsg.writeBytes(message.readBytes(size));
+        }
     }
 
     @Override