700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > 关于SpringBoot整合Netty客户端和服务端实现JT808协议

关于SpringBoot整合Netty客户端和服务端实现JT808协议

时间:2022-10-07 19:48:18

相关推荐

关于SpringBoot整合Netty客户端和服务端实现JT808协议

关于SpringBoot整合Netty客户端和服务端实现JT808协议

最近做了一个使用netty实现交通部JT808协议的项目,对比了mina和netty两种框架的使用,先整理一下netty的实现过程,并在后续对比mina的实现。

什么是netty?

Netty 是一个利用 Java 的高级网络的能力,隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。是由JBOSS提供的一个java开源框架,是一个基于NIO的客户、服务器端的编程框架

开发环境:

JDK:1.8 SpringBoot:2.2.11.RELEASE

服务端实现------终端传输消息到服务端并收到服务端应答消息

加入依赖

<!--netty--><dependency><groupId>ty</groupId><artifactId>netty-all</artifactId><version>4.1.6.Final</version></dependency>

. netty消息包数据格式—PackageData

package com.carshow.xcb.biz.tcp.vo;import ty.channel.Channel;import lombok.Data;import java.util.Arrays;@Datapublic class PackageData {/*** 16byte消息头*/protected MsgHeader msgHeader;/*** 消息体字节数组*/protected byte[] msgBodyBytes;/*** 校验码 1byte*/protected int checkSum;protected Channel channel;@Overridepublic String toString() {return "PackageData{" +"msgHeader=" + msgHeader +", msgBodyBytes=" + Arrays.toString(msgBodyBytes) +", checkSum=" + checkSum +", channel=" + channel +'}';}@Datapublic static class MsgHeader {/*** 版本号*/protected int version;/*** 消息id*/protected int msgId;/*********消息体属性**********/protected int msgBodyPropsField;/*** 消息体长度*/protected int msgBodyLength;/*** 数据加密方式*/protected int encryptionType;/*** 是否分包,true==>有消息包封装项*/protected boolean hasSubPackage;/*** 保留位[14-15]*/protected String reservedBit;/*********消息包封装项**********//*** 终端手机号*/protected String terminalPhone;/*** 流水号*/protected int flowId;/*** byte[12-15]*/protected int packageInfoField;/*** 消息包总数(word(16))*/protected long totalSubPackage;/*** 包序号(word(16))这次发送的这个消息包是分包中的第几个消息包, 从 1 开始*/protected long subPackageSeq;@Overridepublic String toString() {return "MsgHeader{" +"version=" + version +",msgId=" + msgId +", msgBodyPropsField=" + msgBodyPropsField +", msgBodyLength=" + msgBodyLength +", encryptionType=" + encryptionType +", hasSubPackage=" + hasSubPackage +", reservedBit='" + reservedBit + '\'' +", terminalPhone='" + terminalPhone + '\'' +", flowId=" + flowId +", packageInfoField=" + packageInfoField +", totalSubPackage=" + totalSubPackage +", subPackageSeq=" + subPackageSeq +'}';}}}

1. netty服务端启动–NettyTcpServer

package com.carshow.xcb.biz.tcp.server;import ty.bootstrap.ServerBootstrap;import ty.channel.ChannelFuture;import ty.channel.ChannelOption;import ty.channel.nio.NioEventLoopGroup;import ty.channel.socket.nio.NioServerSocketChannel;import ty.util.ResourceLeakDetector;import ty.util.concurrent.EventExecutorGroup;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.annotation.Value;import org.ponent;import javax.annotation.PostConstruct;import javax.annotation.PreDestroy;//@Componentpublic class NettyTcpServer {private Logger logger = LoggerFactory.getLogger(NettyTcpServer.class);@Value("${netty.port}")private int port;@Autowired@Qualifier("bossGroup")private NioEventLoopGroup bossGroup;@Autowired@Qualifier("workerGroup")private NioEventLoopGroup workerGroup;@Autowired@Qualifier("businessGroup")private EventExecutorGroup businessGroup;@Autowiredprivate JT808ChannelInitializer jt808ChannelInitializer;/*** @PostConstruct* 该注解被用来修饰一个非静态的void()方法。 被@PostConstruct修饰的方法会在服务器加载Servlet的时候运行,* 并且只会被服务器执行一次。PostConstruct在构造函数之后执行,init()方法之前执行*//*** 启动server*/// @PostConstruct //启动项目就执行public void start() throws InterruptedException {try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.group(bossGroup, workerGroup) // 绑定线程池.channel(NioServerSocketChannel.class).childHandler(jt808ChannelInitializer) //编码解码.option(ChannelOption.SO_BACKLOG, 1024)//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数 服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝.childOption(ChannelOption.TCP_NODELAY, true)//立即写出.childOption(ChannelOption.SO_KEEPALIVE, true);//内存泄漏检测 开发推荐PARANOID 线上SIMPLE 保持长连接,2小时无数据激活心跳机制ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.SIMPLE);//内存泄漏检测 开发推荐PARANOID 线上SIMPLEChannelFuture channelFuture = serverBootstrap.bind(port).sync();if (channelFuture.isSuccess()) {logger.info("TCP服务启动完毕,port={}", this.port);}//关闭channel和块,直到它被关闭// channelFuture.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}/*** 被@PreDestroy修饰的方法会在服务器卸载Servlet的时候运行,* 并且只会被服务器调用一次,类似于Servlet的destroy()方法。*//*** 销毁资源*/@PreDestroypublic void destroy() {bossGroup.shutdownGracefully().syncUninterruptibly();workerGroup.shutdownGracefully().syncUninterruptibly();businessGroup.shutdownGracefully().syncUninterruptibly();logger.info("关闭成功");}}

2. netty服务端编码解码方式—JT808ChannelInitializer

package com.carshow.xcb.biz.tcp.server;import com.carshow.xcb.mon.TPMSConsts;import com.carshow.xcb.biz.tcp.handle.TCPServerHandler;import ty.buffer.ByteBuf;import ty.buffer.Unpooled;import ty.channel.ChannelInitializer;import ty.channel.ChannelPipeline;import ty.channel.socket.SocketChannel;import ty.handler.codec.DelimiterBasedFrameDecoder;import ty.handler.codec.bytes.ByteArrayDecoder;import ty.handler.codec.bytes.ByteArrayEncoder;import ty.handler.codec.string.StringDecoder;import ty.handler.codec.string.StringEncoder;import ty.handler.timeout.IdleStateHandler;import ty.util.CharsetUtil;import ty.util.concurrent.EventExecutorGroup;import org.apache.mina.filter.codec.ProtocolCodecFilter;import org.apache.mina.filter.codec.textline.LineDelimiter;import org.apache.mina.filter.codec.textline.TextLineCodecFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Qualifier;import org.springframework.beans.factory.annotation.Value;import org.ponent;import java.nio.charset.Charset;import java.util.concurrent.TimeUnit;@Componentpublic class JT808ChannelInitializer extends ChannelInitializer<SocketChannel> {@Value("${netty.read-timeout}")private int readTimeOut;@Autowired@Qualifier("businessGroup")//开了50个线程来确保并发性private EventExecutorGroup businessGroup;@Autowiredprivate TCPServerHandler tcpServerHandler;//单例注入@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new IdleStateHandler(readTimeOut, 0, 0, TimeUnit.MINUTES));// jt808协议 包头最大长度16+ 包体最大长度1023+分隔符2+转义字符最大姑且算60 = 1100//DelimiterBasedFrameDecoder是一个分隔符解码器。// pipeline.addLast(//new DelimiterBasedFrameDecoder(1100, Unpooled.copiedBuffer(new byte[]{TPMSConsts.pkg_delimiter}),// Unpooled.copiedBuffer(new byte[]{TPMSConsts.pkg_delimiter, TPMSConsts.pkg_delimiter})));// ByteBuf delimiter = Unpooled.copiedBuffer("\n".getBytes());// pipeline.addLast(new DelimiterBasedFrameDecoder(1024, delimiter));// pipeline.addLast(//new DelimiterBasedFrameDecoder(1100, Unpooled.copiedBuffer(new byte[]{TPMSConsts.pkg_delimiter}),// Unpooled.copiedBuffer(new byte[]{TPMSConsts.pkg_delimiter, TPMSConsts.pkg_delimiter})));pipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));pipeline.addLast(businessGroup, tcpServerHandler);//涉及到数据库操作,所以放入businessGroup}}

3. netty服务端handler—TCPServerHandler

package com.carshow.xcb.biz.tcp.handle;import com.carshow.xcb.biz.tcp.codec.MsgDecoder;import com.carshow.xcb.monConstant;import com.carshow.xcb.mon.TPMSConsts;import com.carshow.xcb.biz.tcp.server.SessionManager;import com.carshow.xcb.biz.tcp.vo.PackageData;import com.carshow.xcb.biz.tcp.vo.Session;import com.carshow.xcb.biz.tcp.vo.req.LocationInfoUploadMsg;import com.carshow.xcb.biz.tcp.vo.req.TerminalAuthenticationMsg;import com.carshow.xcb.biz.tcp.vo.req.TerminalRegisterMsg;import com.mon.utils.Jt808.HexStringUtils;import com.mon.utils.Jt808.JT808ProtocolUtils;import ty.buffer.ByteBuf;import ty.buffer.Unpooled;import ty.channel.*;import ty.handler.timeout.IdleState;import ty.handler.timeout.IdleStateEvent;import ty.util.ReferenceCountUtil;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.scheduling.annotation.Scheduled;import org.ponent;import javax.annotation.Resource;import java.math.BigDecimal;import java.util.Date;import java.util.List;/*** 在这里我使用spring容器管理TCPServerHandler,所以默认是单例的;* 当有多个客户端连接时会行程多条管线,由于默认每条管线(Channel)创建的ChannelPipeline管理自己的EchoServerHandler实例* DefaultChannelPipeline类中的checkMultiplicity()方法会将实例状态由false改为true,导致这个单例的handler无法再被使用* 因此需要加上@Sharable注解才能通用这个单例*/@Component@ChannelHandler.Sharablepublic class TCPServerHandler extends ChannelInboundHandlerAdapter {private Logger logger = LoggerFactory.getLogger(TCPServerHandler.class);private final SessionManager sessionManager;private final MsgDecoder decoder;private TerminalMsgProcessService msgProcessService;private JT808ProtocolUtils protocolUtils = new JT808ProtocolUtils();public TCPServerHandler() {this.sessionManager = SessionManager.getInstance();this.decoder = new MsgDecoder();this.msgProcessService = new TerminalMsgProcessService();}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {try {// ByteBuf buf = (ByteBuf) msg;// if (buf.readableBytes() <= 0) {//ReferenceCountUtil.safeRelease(msg);//return;// }// byte[] bs1 = new byte[buf.readableBytes()];// buf.getBytes(buf.readerIndex(),bs1);//终端通过Sting类型传递消息,,因为是自己私下定义,所以就暂时使用String,可以走ByteBufString str=msg.toString().replaceAll("\r|\n","");// String str=msg.toString();System.out.println("msg:"+str);//字符串转bytebyte[] bs= HexStringUtils.hexStringToByte(str);// buf.readBytes(bs);bs = this.protocolUtils.doEscape4Receive(bs, 0, bs.length);// 字节数据转换为针对于808消息结构的实体类PackageData pkg = this.decoder.bytes2PackageData(bs);// 引用channel,以便回送数据给硬件pkg.setChannel(ctx.channel());this.processPackageData(pkg, ctx);} catch (Exception e) {logger.error("", e);} finally {release(msg);}}/*** 处理业务逻辑** @param packageData*/private void processPackageData(PackageData packageData, ChannelHandlerContext ctx) {final PackageData.MsgHeader header = packageData.getMsgHeader();// 1. 终端心跳-消息体为空 ==> 平台通用应答if (TPMSConsts.msg_id_terminal_heart_beat == header.getMsgId()) {logger.info(">>>>>[终端心跳],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());ChannelId channelId = ctx.channel().id();if (!CommonConstant.VEHICLE_MAP.containsKey(header.getTerminalPhone())) {logger.info("当前有新连接接入:" + header.getTerminalPhone());CommonConstant.VEHICLE_MAP.put(header.getTerminalPhone(), channelId);logger.info("当前连接总数:" + CommonConstant.VEHICLE_MAP.size());}try {this.msgProcessService.processTerminalHeartBeatMsg(packageData);} catch (Exception e) {logger.error("<<<<<[终端心跳]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),e.getMessage());e.printStackTrace();}}// 5. 终端鉴权 ==> 平台通用应答else if (TPMSConsts.msg_id_terminal_authentication == header.getMsgId()) {logger.info(">>>>>[终端鉴权],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());try {TerminalAuthenticationMsg authenticationMsg = new TerminalAuthenticationMsg(packageData);this.msgProcessService.processAuthMsg(authenticationMsg);} catch (Exception e) {logger.error("<<<<<[终端鉴权]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),e.getMessage());e.printStackTrace();}}// 6. 终端注册 ==> 终端注册应答else if (TPMSConsts.msg_id_terminal_register == header.getMsgId()) {logger.info(">>>>>[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());try {TerminalRegisterMsg msg = this.decoder.toTerminalRegisterMsg(packageData);this.msgProcessService.processRegisterMsg(msg);logger.info("<<<<<[终端注册],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());} catch (Exception e) {logger.error("<<<<<[终端注册]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),e.getMessage());e.printStackTrace();}}// 7. 终端注销(终端注销数据消息体为空) ==> 平台通用应答else if (TPMSConsts.msg_id_terminal_log_out == header.getMsgId()) {try {this.msgProcessService.processTerminalLogoutMsg(packageData);logger.info("<<<<<[终端注销],phone={},flowid={}", header.getTerminalPhone(), header.getFlowId());} catch (Exception e) {logger.error("<<<<<[终端注销]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),e.getMessage());e.printStackTrace();}}// 3. 位置信息汇报 ==> 平台通用应答else if (TPMSConsts.msg_id_terminal_location_info_upload == header.getMsgId()) {try {LocationInfoUploadMsg locationInfoUploadMsg = this.decoder.toLocationInfoUploadMsg(packageData);this.msgProcessService.processLocationInfoUploadMsg(locationInfoUploadMsg);System.out.println("拿到了-位置信息汇报");} catch (Exception e) {logger.error("<<<<<[位置信息]处理错误,phone={},flowid={},err={}", header.getTerminalPhone(), header.getFlowId(),e.getMessage());e.printStackTrace();}}// 其他情况else {logger.error(">>>>>>[未知消息类型],phone={},msgId={},package={}", header.getTerminalPhone(), header.getMsgId(),packageData);}}@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {Session session = Session.buildSession(ctx.channel());sessionManager.put(session.getId(), session);logger.debug("终端连接:{}", session);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {final String sessionId = ctx.channel().id().asLongText();this.sessionManager.removeBySessionId(sessionId);logger.debug("终端断开连接:{}", sessionId);ctx.channel().close();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {if (IdleStateEvent.class.isAssignableFrom(evt.getClass())) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {Session session = this.sessionManager.removeBySessionId(Session.buildId(ctx.channel()));logger.error("服务器主动断开连接:{}", session);ctx.close();}}}@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {logger.error("发生异常:{}", cause.getMessage());cause.printStackTrace();}private void release(Object msg) {try {ReferenceCountUtil.release(msg);} catch (Exception e) {e.printStackTrace();}}}

服务端暂时到此,所用到工具类暂未发出来,毕竟是终端跟自己连接,可以自己定义规则协议。。。

客户端实现------客户端传输消息解析JT808消息并收到应答消息

1.客户端启动-------NettyClient

package com.carshow.xcb.biz.tcp.client;import com.carshow.xcb.biz.tyClientHandler;import com.mon.utils.DateTimeUtils;import com.sun.corba.se.impl.protocol.giopmsgheaders.MessageBase;import ty.bootstrap.Bootstrap;import ty.channel.*;import ty.channel.nio.NioEventLoopGroup;import ty.channel.socket.SocketChannel;import ty.channel.socket.nio.NioSocketChannel;import ty.handler.codec.string.StringDecoder;import ty.handler.codec.string.StringEncoder;import lombok.Data;import lombok.extern.slf4j.Slf4j;import org.apache.http.client.utils.DateUtils;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.beans.factory.annotation.Value;import org.ponent;import org.springframework.stereotype.Service;import javax.annotation.PostConstruct;import java.util.concurrent.TimeUnit;/*** @Author xw* @Description netty客户端* @Date /3/31 15:26*/@Service(value = "nettyClient")@Slf4jpublic class NettyClient{private EventLoopGroup group = new NioEventLoopGroup();@Value("${netty.client.port}")private int port;@Value("${netty.client.host}")private String host;private SocketChannel socketChannel;private static EventLoopGroup eventLoopGroup = new NioEventLoopGroup();private boolean initFlag = true; //标记位@Autowiredprivate NettyClientChannelInitializer nettyClientInitializer;// @PostConstructpublic void run(){doConnect(new Bootstrap(), eventLoopGroup);}public void doConnect(Bootstrap bootstrap, EventLoopGroup eventLoopGroup){ChannelFuture channelFuture = null;try {if (bootstrap != null) {bootstrap.group(eventLoopGroup).channel(NioSocketChannel.class).option(ChannelOption.SO_KEEPALIVE, true).handler(nettyClientInitializer).remoteAddress(host, port);channelFuture = bootstrap.connect().addListener((ChannelFuture futureListener) -> {final EventLoop eventLoop = futureListener.channel().eventLoop();if (!futureListener.isSuccess()) {System.out.println("与服务器断开连接!10s后准备尝试重连");eventLoop.schedule(() -> doConnect(new Bootstrap(), eventLoop), 10, TimeUnit.SECONDS);}else {log.info("连接Netty服务端成功");}});if (initFlag) {System.out.println("netty客户端启动成功");initFlag = false;}channelFuture.channel().closeFuture().sync();}}catch (Exception e){System.out.println("客户端连接失败!" + e.getMessage());}}}

2.客户端初始化 设置出站编码器和入站解码器-------NettyClientChannelInitializer

package com.carshow.xcb.biz.tcp.client;import com.carshow.xcb.mon.TPMSConsts;import com.carshow.xcb.biz.tyClientHandler;import com.carshow.xcb.biz.tcp.handle.TCPServerHandler;import ty.buffer.Unpooled;import ty.channel.ChannelInitializer;import ty.channel.ChannelPipeline;import ty.channel.socket.SocketChannel;import ty.handler.codec.DelimiterBasedFrameDecoder;import ty.handler.codec.bytes.ByteArrayEncoder;import ty.handler.codec.protobuf.ProtobufDecoder;import ty.handler.codec.protobuf.ProtobufEncoder;import ty.handler.codec.protobuf.ProtobufVarint32FrameDecoder;import ty.handler.codec.protobuf.ProtobufVarint32LengthFieldPrepender;import ty.handler.codec.string.StringDecoder;import ty.handler.codec.string.StringEncoder;import ty.handler.timeout.IdleStateHandler;import ty.util.CharsetUtil;import org.springframework.beans.factory.annotation.Autowired;import org.ponent;import java.nio.charset.Charset;import java.util.concurrent.TimeUnit;/*** @Author xw* @Description 客户端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器,客户端服务端编解码要一致* @Date /3/31 15:41*/@Componentpublic class NettyClientChannelInitializer extends ChannelInitializer<SocketChannel> {@Autowiredprivate NettyClientHandler nettyClientHandler;@Overrideprotected void initChannel(SocketChannel channel) throws Exception {ChannelPipeline pipeline = channel.pipeline();pipeline.addLast(new IdleStateHandler(0, 5, 0, TimeUnit.SECONDS));// pipeline.addLast(new ProtobufVarint32LengthFieldPrepender());pipeline.addLast("nettyClientHandler", nettyClientHandler);pipeline.addLast(new DelimiterBasedFrameDecoder(1100, Unpooled.copiedBuffer(new byte[]{TPMSConsts.pkg_delimiter}),Unpooled.copiedBuffer(new byte[]{TPMSConsts.pkg_delimiter, TPMSConsts.pkg_delimiter})));pipeline.addLast("decoder",new StringDecoder(CharsetUtil.UTF_8));pipeline.addLast("encoder",new StringEncoder(CharsetUtil.UTF_8));//编码格式// pipeline.addLast(new StringEncoder(Charset.forName("GBK")));//解码格式// pipeline.addLast(new StringDecoder(Charset.forName("UTF-8")));}}

1. 客户端处理类------NettyClientHandler

package com.carshow.xcb.biz.tcp.handle;import com.carshow.xcb.biz.service.ITcpLogService;import com.carshow.xcb.biz.tcp.client.ClientSendMessage;import com.carshow.xcb.biz.tyClient;import com.carshow.xcb.biz.tcp.codec.MsgDecoder;import com.carshow.xcb.monConstant;import com.carshow.xcb.mon.TPMSConsts;import com.carshow.xcb.biz.tcp.server.SessionManager;import com.carshow.xcb.biz.tcp.vo.PackageData;import com.carshow.xcb.biz.tcp.vo.Session;import com.carshow.xcb.biz.tcp.vo.req.LocationInfoUploadMsg;import com.carshow.xcb.biz.tcp.vo.req.TerminalAuthenticationMsg;import com.carshow.xcb.biz.tcp.vo.req.TerminalRegisterMsg;import com.mon.constants.tcp.ParamContent;import com.mon.constants.tcp.PlatformMessageParse;import com.mon.constants.tcp.PlatformReciveMessage;import com.mon.model.TCountryPlatormTcpLog;import com.mon.utils.ApiUtil;import com.mon.utils.DateTimeUtils;import com.mon.utils.Jt808.HexStringUtils;import com.mon.utils.Jt808.JT808ProtocolUtils;import com.mon.utils.RedisUtil;import ty.bootstrap.Bootstrap;import ty.buffer.ByteBuf;import ty.buffer.Unpooled;import ty.channel.*;import ty.handler.timeout.IdleState;import ty.handler.timeout.IdleStateEvent;import ty.util.ReferenceCountUtil;import lombok.extern.slf4j.Slf4j;import org.apache.mina.core.buffer.IoBuffer;import org.slf4j.Logger;import org.slf4j.LoggerFactory;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.stereotype.Service;import javax.annotation.Resource;import static ty.util.ReferenceCountUtil.release;/*** @Author xw* @Description 客户端处理类* @Date /3/31 15:44*/@Slf4j@Service(value = "nettyClientHandler")@ChannelHandler.Sharablepublic class NettyClientHandler extends ChannelInboundHandlerAdapter {@Autowiredprivate NettyClient nettyClient;@Autowiredprivate RedisUtil redisUtil;private final SessionManager sessionManager;private final MsgDecoder decoder=new MsgDecoder();private TerminalMsgProcessService msgProcessService;private Logger logger = LoggerFactory.getLogger(TCPServerHandler.class);private JT808ProtocolUtils protocolUtils = new JT808ProtocolUtils();@Resourceprivate ITcpLogService countryPlatormTcpLogService;public NettyClientHandler() {this.sessionManager = SessionManager.getInstance();}/*** 处理业务逻辑*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {logger.info("==========服务端给我发消息了,好激动呀==============="+msg);try {ByteBuf buf = (ByteBuf) msg;if (buf.readableBytes() <= 0) {//ReferenceCountUtil.safeRelease(msg);return;}byte[] bs = new byte[buf.readableBytes()];buf.readBytes(bs);String str=HexStringUtils.toHexString(bs);logger.info("客户端接收到信息:" + str);bs = this.protocolUtils.doEscape4Receive(bs, 0, bs.length);// 字节数据转换为针对于808消息结构的实体类//PackageData pkg = this.decoder.bytes2PackageData(bs);PlatformReciveMessage pkg = PlatformMessageParse.parsMessage(bs);// 引用channel,以便回送数据给硬件pkg.setChannel(ctx.channel());this.processPackageData(pkg, ctx);} catch (Exception e) {logger.error("", e);} finally {release(msg);}}/*** 处理业务逻辑**/private void processPackageData(PlatformReciveMessage m, ChannelHandlerContext ctx) {//平台登录应答if(m.getMessageId() == 33264){int rep = m.getResult();if(rep == 0){System.out.println("登录平台成功");countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),"平台登陆应答","消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:成功","",1));}else if(rep == 1){System.out.println("IP地址不正确");countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),"平台登陆应答","消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:IP地址不正确","",1));}else if(rep == 2){System.out.println("接入码不正确");countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),"平台登陆应答","消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:接入码不正确","",1));}else if(rep == 3){System.out.println("该平台没有注册");countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),"平台登陆应答","消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:该平台没有注册","",1));}else if(rep == 4){System.out.println("密码错误");countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),"平台登陆应答","消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:密码错误","",1));}else if(rep == 5){System.out.println("资源紧张,稍后再连");countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),"平台登陆应答","消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:资源紧张,稍后再连","",1));}else if(rep == 9){System.out.println("其他");countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),"平台登陆应答","消息id:"+ParamContent.getIntHexStr(m.getMessageId(),4)+" 消息序列号:"+m.getSerialNo()+" 结果:其他","",1));}}//平台通用应答if(m.getMessageId() == 32769){StringBuffer sb = new StringBuffer();System.out.println("应答流水号:"+m.getReciveSerNo());sb.append("应答流水号:"+m.getReciveSerNo()+"\n");System.out.println("应答消息id:"+ ParamContent.getIntHexStr(m.getReciveMsgId(), 4));sb.append("应答消息id:"+ ParamContent.getIntHexStr(m.getReciveMsgId(), 4)+"\n");if(m.getResult() == 0){System.out.println("结果:成功");sb.append("结果:成功\n");if (!"0x0002".equals(ParamContent.getIntHexStr(m.getReciveMsgId(),4))) {countryPlatormTcpLogService.save(new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),"平台通用应答","应答消息id:" + ParamContent.getIntHexStr(m.getReciveMsgId(), 4) + " 应答消息序列号:" + m.getReciveSerNo() + " 应答结果:成功","",1));}}if(m.getResult() == 1){System.out.println("结果:失败");sb.append("结果:失败\n");countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),"平台通用应答","应答消息id:"+ParamContent.getIntHexStr(m.getReciveMsgId(),4)+" 应答消息序列号:"+m.getReciveSerNo()+" 应答结果:失败","",1));}if(m.getResult() == 2){System.out.println("结果:消息有误");sb.append("结果:消息有误\n");countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),"平台通用应答","应答消息id:"+ParamContent.getIntHexStr(m.getReciveMsgId(),4)+" 应答消息序列号:"+m.getReciveSerNo()+" 应答结果:消息有误","",1));}if(m.getResult() == 3){System.out.println("结果:不支持");sb.append("结果:不支持\n");countryPlatormTcpLogService.save( new TCountryPlatormTcpLog(ctx.channel().remoteAddress().toString(),"平台通用应答","应答消息id:"+ParamContent.getIntHexStr(m.getReciveMsgId(),4)+" 应答消息序列号:"+m.getReciveSerNo()+" 应答结果:不支持","",1));}}}/*** 建立连接*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {logger.info("建立连接时:" + DateTimeUtils.getNowDateString1());Session session = Session.buildSession(ctx.channel());sessionManager.put(ApiUtil.SIM, session);ctx.fireChannelActive();}/*** 关闭连接*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {logger.info("关闭连接时:" + DateTimeUtils.getNowDateString1());// final String sessionId = ctx.channel().id().asLongText();this.sessionManager.removeBySessionId(ApiUtil.SIM);// final EventLoop eventLoop = ctx.channel().eventLoop();// nettyClient.doConnect(new Bootstrap(), eventLoop);// super.channelInactive(ctx);}/*** 心跳处理,每秒发送一次心跳请求*/@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object obj) throws Exception {if (obj instanceof IdleStateEvent) {IdleStateEvent idleStateEvent = (IdleStateEvent) obj;if (idleStateEvent.state() == IdleState.WRITER_IDLE) {log.info("已经5s没有发送消息给服务端");//向服务端送心跳包//发送心跳消息,并在发送失败时关闭该连接ctx.channel().writeAndFlush(Unpooled.copiedBuffer(ClientSendMessage.heartbeatMessage())).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);}} else {super.userEventTriggered(ctx, obj);}}}

接下来就是需要发送的消息,ok

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。