Sfoglia il codice sorgente

柜锁增加 netty 方式,数据重新解析

liubo 2 anni fa
parent
commit
e8db94de17

+ 31 - 0
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/netty/ChannelMap.java

@@ -0,0 +1,31 @@
+package com.zd.laboratory.netty;
+
+import io.netty.channel.ChannelHandlerContext;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ChannelMap {
+
+    /**
+     * 存放客户端标识ID(消息ID)与channel的对应关系
+     */
+    private static volatile ConcurrentHashMap<String, ChannelHandlerContext> channelMap = null;
+
+    private ChannelMap() {
+    }
+
+    public static ConcurrentHashMap<String, ChannelHandlerContext> getChannelMap() {
+        if (null == channelMap) {
+            synchronized (ChannelMap.class) {
+                if (null == channelMap) {
+                    channelMap = new ConcurrentHashMap<>();
+                }
+            }
+        }
+        return channelMap;
+    }
+
+    public static ChannelHandlerContext getChannel(String id) {
+        return getChannelMap().get(id);
+    }
+}

+ 29 - 0
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/netty/MessageCodec.java

@@ -0,0 +1,29 @@
+package com.zd.laboratory.netty;
+
+import com.zd.common.core.utils.ReUtil;
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.handler.codec.MessageToMessageCodec;
+import lombok.extern.slf4j.Slf4j;
+
+import java.util.List;
+
+@Slf4j
+public class MessageCodec extends MessageToMessageCodec<ByteBuf, String> {
+
+    @Override
+    protected void encode(ChannelHandlerContext channelHandlerContext, String msg, List<Object> list) throws Exception {
+        log.info("netty消息正在编码 " + msg);
+        list.add(Unpooled.copiedBuffer(ReUtil.hexStringToByteArray(msg)));
+//        list.add(ReUtil.hexStringToByteArray(msg));
+    }
+
+    @Override
+    protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
+        log.info("netty消息正在解码" + byteBuf.readableBytes());
+        byte[] bytes = new byte[byteBuf.readableBytes()];
+        byteBuf.readBytes(bytes);
+        list.add(bytes);
+    }
+}

+ 48 - 0
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/netty/NettyClient.java

@@ -0,0 +1,48 @@
+package com.zd.laboratory.netty;
+
+import io.netty.bootstrap.Bootstrap;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelInitializer;
+import io.netty.channel.EventLoopGroup;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class NettyClient {
+
+    /**
+     * Netty客户端
+     */
+    public static void main(String[] args) throws InterruptedException {
+        //1. 创建线程组
+        EventLoopGroup group = new NioEventLoopGroup();
+        //2. 创建客户端启动助手
+        Bootstrap bootstrap = new Bootstrap();
+        //3. 设置线程组
+        bootstrap.group(group)
+                .channel(NioSocketChannel.class)//4. 设置客户端通道实现为NIO
+                .handler(new ChannelInitializer<SocketChannel>() {//5. 创建一个通道初始化对象
+                    @Override
+                    protected void initChannel(SocketChannel socketChannel) throws Exception {
+
+                        //6、向pipeline中添加自定的的解码编码handler
+//                        socketChannel.pipeline().addLast(new MessageDecoder());
+//                        socketChannel.pipeline().addLast(new MessageEncoder());
+                        //6、向pipeline中添加编解码器
+                        socketChannel.pipeline().addLast(new com.zd.laboratory.netty.MessageCodec());
+
+                        //6. 向pipeline中添加自定义业务处理handler
+                        socketChannel.pipeline().addLast(new NettyClientHandler());
+                    }
+                });
+
+        //7. 启动客户端,等待连接服务端,同时将异步改为同步
+        ChannelFuture channelFuture = bootstrap.connect("127.0.0.1", NettyServer.inetPort);
+        log.info("netty客户端已启动。。。 端口号: " + NettyServer.inetPort);
+        //8. 关闭通道和关闭连接池
+        channelFuture.channel().closeFuture().sync();
+        group.shutdownGracefully();
+    }
+}

+ 76 - 0
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/netty/NettyClientHandler.java

@@ -0,0 +1,76 @@
+package com.zd.laboratory.netty;
+
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
+public class NettyClientHandler implements ChannelInboundHandler {
+    @Override
+    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
+
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
+
+    }
+
+    /**
+      * @description: 客户端发送消息
+    */
+    @Override
+    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
+        ChannelFuture channelFuture = channelHandlerContext.writeAndFlush("你好,我是客户端");
+        channelFuture.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture channelFuture) throws Exception {
+                if(channelFuture.isSuccess()){
+                    log.info("客户端数据发送成功");
+                }else{
+                    log.info("客户端数据发送失败");
+                }
+            }
+        });
+    }
+    @Override
+    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
+
+    }
+
+    /**
+      * @description: 读取消息
+    */
+    @Override
+    public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
+        String msg = (String) o;
+        log.info("客户端接受消息:"+msg);
+    }
+    @Override
+    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
+
+    }
+    @Override
+    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
+
+    }
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
+
+    }
+    @Override
+    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
+
+    }
+    @Override
+    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
+
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
+
+    }
+}

+ 73 - 0
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/netty/NettyServer.java

@@ -0,0 +1,73 @@
+package com.zd.laboratory.netty;
+
+import io.netty.bootstrap.ServerBootstrap;
+import io.netty.channel.*;
+import io.netty.channel.nio.NioEventLoopGroup;
+import io.netty.channel.socket.SocketChannel;
+import io.netty.channel.socket.nio.NioServerSocketChannel;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.annotation.Order;
+import org.springframework.stereotype.Component;
+
+import javax.annotation.PostConstruct;
+
+@Component
+@Slf4j
+@Order(-999)
+public class NettyServer {
+    public static int inetPort = 12323;
+
+    /**
+     * Netty 服务端
+     */
+    @PostConstruct
+    public void start() throws InterruptedException {
+
+        //1. 创建bossGroup线程组: 处理网络事件--连接事件,不设置线程数默认为2*处理器线程数
+        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
+
+        //2. 创建workerGroup线程组: 处理网络事件--读写事件,不设置线程数默认为2*处理器线程数
+        EventLoopGroup workerGroup = new NioEventLoopGroup();
+
+        //3. 创建服务端启动助手
+        ServerBootstrap bootstrap = new ServerBootstrap();
+
+        //4. 设置bossGroup线程组和workerGroup线程组
+        bootstrap.group(bossGroup,workerGroup)
+                .channel(NioServerSocketChannel.class)//5. 设置服务端通道实现为NIO
+                .option(ChannelOption.SO_BACKLOG,128)//6、参数设置-设置线程队列中等待链接个数
+                .childOption(ChannelOption.SO_KEEPALIVE,Boolean.TRUE)//6、参数设置-设置活跃状态,其中child设置的是workerGroup
+                .childHandler(new ChannelInitializer<SocketChannel>() {//7. 创建一个通道初始化对象
+                    @Override
+                    protected void initChannel(SocketChannel socketChannel) throws Exception {
+
+                        //8、向piple中添加编解码器
+                        socketChannel.pipeline().addLast(new MessageCodec());
+                        //8. 向pipeline中添加自定义业务处理handler
+                        socketChannel.pipeline().addLast(new NettyServerHandler());
+                    }
+                });
+
+        //9. 启动服务端并绑定端口
+        ChannelFuture channelFuture = bootstrap.bind(inetPort).sync();
+        channelFuture.addListener(new ChannelFutureListener() {
+            @Override
+            public void operationComplete(ChannelFuture channelFuture) throws Exception {
+                if(channelFuture.isSuccess()){
+                    log.info("netty绑定端口成功");
+                }else{
+                    log.info("netty绑定端口失败");
+                }
+            }
+        });
+        log.info("服务器启动成功");
+        //10. 关闭通道(并不是真正意义上的关闭通道,而是监听通道的关闭状态)和关闭链接
+        /*try {
+            channelFuture.channel().closeFuture().sync();
+            bossGroup.shutdownGracefully();
+            workerGroup.shutdownGracefully();
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }*/
+    }
+}

+ 150 - 0
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/netty/NettyServerHandler.java

@@ -0,0 +1,150 @@
+package com.zd.laboratory.netty;
+
+import com.zd.common.core.redis.RedisService;
+import com.zd.common.core.utils.ReUtil;
+import com.zd.common.core.utils.SpringUtils;
+import com.zd.laboratory.socket.runner.TCPServer;
+import com.zd.laboratory.utils.CRCCHECK;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandler;
+import lombok.extern.slf4j.Slf4j;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 服务器端handler
+ */
+@Slf4j
+public class NettyServerHandler implements ChannelInboundHandler {
+
+    /**
+    * 读取客户端发送的消息
+    */
+    @Override
+    public void channelRead(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
+        byte[] msg  = (byte[]) o;
+        String data = TCPServer.bytesToHexString(msg).toUpperCase();
+        log.info("netty服务端接受消息为:" + data);
+        if(data.equals("6862")){
+            return;
+        }
+
+        log.info("netty服务端接受消息转10进制为:" + ByteBuffer.wrap(msg).getLong());
+//        if(data.startsWith(SocketTypes.LOCK_PREFIX)){
+//        if(data.startsWith("33")){
+
+        String relayCode = data.substring(0, 108);
+//        33 31 20 33 30 20 36 33 20 33 36 20 36 34 20 33 37 20 33 36 20 33 33 20 33 31 20 33 35 20 33 31 20 33 35 20 33 30 20 36 33 20 33 33 20 33 35 20 33 32 20 33 37 20
+
+//        05 05 00 01 ee 00 d0 2e
+
+            // 将通道加入ChannelMap
+            ChannelMap.getChannelMap().put(data, channelHandlerContext);
+//        }else {
+//            if(data.length() != 12){
+//                log.info("netty柜锁回调指令非状态指令!" + data);
+//                return;
+//            }
+
+//            AtomicReference<String> relayCode = new AtomicReference<>("");
+//            ChannelMap.getChannelMap().entrySet().forEach(a -> {
+//                if(a.getValue() == channelHandlerContext){
+//                    relayCode.set(a.getKey());
+//                }
+//            });
+
+            if(data.length() > relayCode.length()) {
+                data = data.replaceAll(relayCode, "");
+                // 柜锁bit 位
+                long bit = CRCCHECK.getBitByCommand(data);
+                // 1开锁状态 0关锁状态
+                int status = CRCCHECK.getLockStatus(data);
+
+                log.info("netty柜锁回调:" + relayCode + ":" + bit + ",回调结果" + (status == 1 ? "开启": "关闭") + ",指令:" + data);
+
+                RedisService redisService = SpringUtils.getBean(RedisService.class);
+                redisService.setCacheObject(relayCode + ":" + bit, status, 3 * 60L, TimeUnit.SECONDS);
+            }
+
+    }
+
+    /**
+    * @description: 读取消息后开始的操作
+    */
+    @Override
+    public void channelReadComplete(ChannelHandlerContext channelHandlerContext) throws Exception {
+        String instruct = CRCCHECK.getOpenLockOrder(1);
+
+        byte[] bytes = ReUtil.hexStringToByteArray(instruct);
+//        log.info("netty服务端数据发送数据:" + instruct);
+//        ChannelFuture channelFuture = channelHandlerContext.writeAndFlush(bytes);
+//        ChannelFuture channelFuture = channelHandlerContext.writeAndFlush(instruct);
+//        channelFuture.addListener(new ChannelFutureListener() {
+//            @Override
+//            public void operationComplete(ChannelFuture channelFuture) throws Exception {
+//                if(channelFuture.isSuccess()){
+//                    log.info("服务端数据发送成功");
+//                }else{
+//                    log.info("服务端数据发送失败");
+//                }
+//            }
+//        });
+    }
+
+    /**
+    * @description: 异常发生时
+    */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext channelHandlerContext, Throwable throwable) throws Exception {
+        throwable.printStackTrace();
+        channelHandlerContext.close();
+    }
+
+    @Override
+    public void channelRegistered(ChannelHandlerContext channelHandlerContext) throws Exception {
+
+    }
+
+    @Override
+    public void channelUnregistered(ChannelHandlerContext channelHandlerContext) throws Exception {
+
+    }
+
+    @Override
+    public void channelActive(ChannelHandlerContext channelHandlerContext) throws Exception {
+
+    }
+
+    @Override
+    public void channelInactive(ChannelHandlerContext channelHandlerContext) throws Exception {
+
+    }
+
+    @Override
+    public void userEventTriggered(ChannelHandlerContext channelHandlerContext, Object o) throws Exception {
+
+    }
+
+    @Override
+    public void channelWritabilityChanged(ChannelHandlerContext channelHandlerContext) throws Exception {
+
+    }
+
+    @Override
+    public void handlerAdded(ChannelHandlerContext channelHandlerContext) throws Exception {
+        log.info("netty客户端开始连接" + channelHandlerContext.toString());
+    }
+
+    @Override
+    public void handlerRemoved(ChannelHandlerContext channelHandlerContext) throws Exception {
+        log.info("netty客户端断开连接" + channelHandlerContext.toString());
+        ChannelMap.getChannelMap().entrySet().forEach(a -> {
+            if(a.getValue() == channelHandlerContext.channel()){
+                ChannelMap.getChannelMap().remove(a.getKey());
+            }
+        });
+    }
+
+
+}

+ 18 - 0
zd-modules/zd-modules-laboratory/src/main/java/com/zd/laboratory/netty/PushMsgService.java

@@ -0,0 +1,18 @@
+package com.zd.laboratory.netty;
+
+import io.netty.buffer.Unpooled;
+import io.netty.channel.ChannelHandlerContext;
+import org.springframework.stereotype.Service;
+
+@Service
+public class PushMsgService {
+
+    public void push(String relayCode, byte[] bytes) {
+        // 客户端ID
+        ChannelHandlerContext channelHandlerContext = ChannelMap.getChannel(relayCode);
+        if (null == channelHandlerContext) {
+            throw new RuntimeException("柜锁已离线");
+        }
+        channelHandlerContext.writeAndFlush(Unpooled.copiedBuffer(bytes));
+    }
+}