使用netty不是一天两天了,但是使用netty和DTU通讯还是第一次,接下来要做DTU的通讯协议,对接工作还没有正式开始,只收到一个简单的DTU协议文档,里面的内容大概是下面表格中的样子。
位数
内容
取值
0
字头
0XCC
1
数据长度
低八位
2
数据长度
高八位
3
类型
0:心跳,1:登录
4
机器编码
0X00-0XFF
5
校验位
前面字节相加的低八位
这里把通讯协议简化了一下,仅剩下心跳和登录,如果有其他参数可以在此基础上进行扩展;从表格中可以看出来,每个字段数据位数还不一样,有的一位,有的两位,数据长度占两位,其他各占用一位。
只有这么一个文档,只有这么一丁点信息,其他就什么也不知道了,这可如何是好?不知道从哪里下手,这个疑问多多的项目就这样放了一段时间。
但是也不能总是这样放着呀,如果对接的人来了,我这边什么也没有,一下子也建不起一个项目呀?转念又想,曾经使用 **netty + google protobuf ** 开发过IM项目,也有些相似之处。这个DTU可否使用google protobuf呢?
于是,写了一个简单的客户端,一个服务端,来进行收发信息,google protobuf是通过对象编码成二进制进行数据通讯的,但文档中是字节数组,压根没有对象一说呀?写完了demo,测试一遍,但是和字节数组对应不起来,最后还是删掉了。
在网上找了很多资料,找来找去只找到这么两篇Java采用Netty实现基于DTU的TCP服务器 + 多端口 + 多协议
Java 使用Socket 实现基于DTU的TCP服务器 + 数据解析 + 心跳检测可以参考,试着把上面的demo扒拉了好几遍,通过这两篇文章,可以获得一些信息:
第一,可以使用netty和dtu进行通信,选择使用netty框架没有错;
第二,和dtu对接,接收到的是字节数组,不能使用google的protobuf框架,需要另做处理。
把参考文档中带netty的demo也试着在本地拷贝了一份,大概知道了对接收到的字节数组如何处理,但是demo中只有接收,没有发送,这是不够完美的;这也是个问题,单方面的,不好运行呀。
另外在处理字节数组的时候,在流程上还不是太标准,后面有可能会遇到半包、粘包的问题,这些都是需要面对的问题。结合自己曾经开发过IM的经验,把编解码处理和数据处理也分离出来,半包、粘包的问题一并考过进去,这样后面再完善就方便多了。
下面就开始SpringBoot2.1.4 + netty + DTU的客户端编码,难点就在于字节数组的解码和编码,一进一出,搞定了这一步,其他的业务逻辑就好处理了。这里的客户端编码是为了测试DTU的请求,以便和服务端互动起来,可以进行测试。
第一步,pom文件引入netty架包,就这两个架包足够用的了;
org.springframework.boot
spring-boot-starter
ty
netty-all
第二步,对字节数组的编解码,包括半包、粘包处理;
字节数据解码类ByteArrayDecoder:
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;
import ty.buffer.ByteBuf;
import ty.channel.ChannelHandlerContext;
import ty.handler.codec.ByteToMessageDecoder;
/**
* byte 1字节 (8位) -27~27-1 0 Byte 255
* short 2字节 (16位) -215~215-1 0 Short
* int 4字节 (32位) -231~ 231-1 0 Integer
* long 8字节 (64位) -263~263-1 0 Long
* char 2字节 (C语言中是1字节)可以存储一个汉字
* float 4字节 (32位) -3.4e+38 ~ 3.4e+38 0.0f Float
* double 8字节 (64位) -1.7e+308 ~ 1.7e+308 0 Double
* char 2字节(16位) u0000~uFFFF(‘’~‘?’) ‘0’ Character (0~216-1(65535))
* 布尔 boolean 1/8字节(1位) true, false FALSE Boolean
* C语言中,short、int、float、long、double,分别为:1个、2个、4个、8个、16个
* 对字节数组进行解码
* @author 程就人生
* @date 8月3日
* @Description
*
*/
public class ByteArrayDecoder extends ByteToMessageDecoder{
private static Logger log = LoggerFactory.getLogger(ByteArrayDecoder.class);
@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List out) throws Exception {
// 标记一下当前的readIndex的位置
in.markReaderIndex();
//判断获取到的数据是否够字头,不沟通字头继续往下读
//字头:1位,数据串总长度:2位
if(in.readableBytes() < ProtoInstant.FILED_LEN){
log.info("不够包头,继续读!");
return;
}
//读取字头1位
int fieldHead = CharacterConvert.byteToInt(in.readByte());
if(fieldHead != ProtoInstant.FIELD_HEAD){
String error = "字头不对:" + ctx.channel().remoteAddress();
log.info(error);
ctx.close();
return;
}
//长度2位,读取传送过来的消息的长度。
int length = CharacterConvert.shortToInt(in.readShort());
// 长度如果小于0
if (length < 0) {// 非法数据,关闭连接
log.info("数据长度为0,非法数据,关闭连接!");
ctx.close();
return;
}
// 读到的消息体长度如果小于传送过来的消息长度,减去字头1位,数据长度2位
int dataLength = length - ProtoInstant.FILED_LEN;
if (dataLength > in.readableBytes()) {
// 重置读取位置
in.resetReaderIndex();
return;
}
byte[] array;
if (in.hasArray()) {
log.info("堆缓冲");
// 堆缓冲
ByteBuf slice = in.slice();
array = slice.array();
} else {
log.info("直接缓冲");
// 直接缓冲
array = new byte[dataLength];
in.readBytes(array, 0, dataLength);
}
if(array.length > 0){
in.retain();
out.add(array);
}
}
}
字节数组编码类ByteArrayEncoder:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.example.instant.ProtoInstant;
import ty.buffer.ByteBuf;
import ty.channel.ChannelHandlerContext;
import ty.handler.codec.MessageToByteEncoder;
/**
* 对字节数组进行编码
* @author 程就人生
* @date 8月3日
* @Description
*
*/
public class ByteArrayEncoder extends MessageToByteEncoder{
private static Logger log = LoggerFactory.getLogger(ByteArrayEncoder.class);
@Override
protected void encode(ChannelHandlerContext ctx, byte[] msg, ByteBuf out) throws Exception {
log.info(".....经过ByteArrayEncoder编码.....");
//字头(1位)
out.writeByte(ProtoInstant.FIELD_HEAD);
//数据长度(2位),字头1位+数据长度2位+数据位(包含校验1位)
out.writeShort(ProtoInstant.FILED_LEN + msg.length);
//消息体,包含我们要发送的数据
out.writeBytes(msg);
}
}
在编解码的时候,考虑到DTU那边对接的有可能是C语音,C语言和Java的数据类型不一样,所占用的位数也不一样,这个需要保持一致。
第三步,客户端启动类;
客户端启动类NettyClient,这里在头部加了@Component,只要项目一启动就去建立与服务端的链接,建立链接后登录,登录后保持心跳;
import java.util.Date;
import java.util.concurrent.TimeUnit;
import javax.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.ponent;
import com.example.im.codec.ByteArrayDecoder;
import com.example.im.codec.ByteArrayEncoder;
import com.example.im.handler.ExceptionHandler;
import com.example.im.handler.LoginResponseHandler;
import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;
import ty.bootstrap.Bootstrap;
import ty.buffer.ByteBuf;
import ty.buffer.PooledByteBufAllocator;
import ty.buffer.Unpooled;
import ty.channel.Channel;
import ty.channel.ChannelFuture;
import ty.channel.ChannelInitializer;
import ty.channel.ChannelOption;
import ty.channel.EventLoop;
import ty.channel.EventLoopGroup;
import ty.channel.nio.NioEventLoopGroup;
import ty.channel.socket.SocketChannel;
import ty.channel.socket.nio.NioSocketChannel;
import ty.util.concurrent.Future;
import ty.util.concurrent.GenericFutureListener;
/**
* netty客户端连接类
* @author 程就人生
* @date 8月6日
* @Description
*
*/
@Component
public class NettyClient {
private static Logger log = LoggerFactory.getLogger(NettyClient.class);
// 服务器ip地址
@Value("${munication.host}")
private String host;
// 服务器端口
@Value("${munication.port}")
private int port;
private Channel channel;
@Autowired
private LoginResponseHandler loginResponseHandler;
@Autowired
private ExceptionHandler exceptionHandler;
private Bootstrap bootstrap;
private EventLoopGroup eventLoopGroup = new NioEventLoopGroup();
@PostConstruct
public void start() throws Exception {
//启动客户端
doConnect();
}
/**
* 连接操作
*/
private void doConnect() {
try {
bootstrap = new Bootstrap();
bootstrap.group(eventLoopGroup);
bootstrap.channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
bootstrap.remoteAddress(host, port);
// 设置通道初始化
bootstrap.handler(new ChannelInitializer() {
public void initChannel(SocketChannel ch) {
//编解码处理
ch.pipeline().addLast("decoder", new ByteArrayDecoder());
ch.pipeline().addLast("encoder", new ByteArrayEncoder());
//登录返回处理
ch.pipeline().addLast("loginHandler", loginResponseHandler);
//异常处理
ch.pipeline().addLast("exception", exceptionHandler);
}
}
);
log.info("客户端开始连接");
ChannelFuture f = bootstrap.connect();
f.addListener(connectedListener);
} catch (Exception e) {
e.printStackTrace();
log.info("客户端连接失败!" + e.getMessage());
}
}
//连接关闭监听
GenericFutureListener closeListener = (ChannelFuture f) -> {
log.info(new Date() + ": 连接已经断开……");
channel = f.channel();
};
//连接监听
GenericFutureListener connectedListener = (ChannelFuture f) -> {
final EventLoop eventLoop = f.channel().eventLoop();
if (!f.isSuccess()) {
log.info("连接失败!在10s之后准备尝试重连!");
eventLoop.schedule(() -> doConnect(), 10, TimeUnit.SECONDS);
} else {
log.info("服务器 连接成功!" + f.channel().remoteAddress() + ":" + f.channel().localAddress());
channel = f.channel();
login();
}
};
/**
* 登录操作
*/
private void login(){
//构建登录请求
ByteBuf buf = Unpooled.buffer(3);
//登录
buf.writeByte(ProtoInstant.LOGIN);
buf.writeByte(ProtoInstant.DEVICE_ID);
//校验位
int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD,6,ProtoInstant.LOGIN,ProtoInstant.DEVICE_ID);
int verify = CharacterConvert.getLow8(sum);
buf.writeByte(verify);
writeAndFlush(buf.array());
}
/**
* 发送消息
* @param msg
*/
public void writeAndFlush(Object msg){
this.channel.writeAndFlush(msg).addListener(new GenericFutureListener>() {
@Override
public void operationComplete(Future super Void> future)
throws Exception {
// 回调
if (future.isSuccess()) {
log.info("请求netty服务器,消息发送成功!");
} else {
log.info("请求netty服务器,消息发送失败!");
}
}
});
}
/**
* 重新建立连接
* @throws Exception
*/
public void reconnect() throws Exception {
if (channel != null && channel.isActive()) {
return;
}
log.info("reconnect....");
start();
log.info("reconnect success");
}
/**
* 关闭连接
*/
public void close() {
eventLoopGroup.shutdownGracefully();
}
}
别忘了application.properties文件中的配置:
munication.host=127.0.0.1
munication.port=8500
第四步,handler处理类;
异常处理类ExceptionHandler:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import com.tyClient;
import ty.channel.ChannelHandler;
import ty.channel.ChannelHandlerContext;
import ty.channel.ChannelInboundHandlerAdapter;
/**
* 客户端异常处理handler
* @author 程就人生
* @date 8月3日
* @Description
*
*/
@ChannelHandler.Sharable
@Service("exceptionHandler")
public class ExceptionHandler extends ChannelInboundHandlerAdapter {
private static Logger log = LoggerFactory.getLogger(ExceptionHandler.class);
@Autowired
private NettyClient nettyClient;
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
if (cause instanceof Exception) {
//捕捉异常信息
cause.printStackTrace();
log.error(cause.getMessage());
ctx.close();
} else {
//捕捉异常信息
cause.printStackTrace();
log.error(cause.getMessage());
ctx.close();
}
//出现异常时,定时重连;比如上位机服务器重启服务器
nettyClient.reconnect();
}
/**
* 通道 Read 读取 Complete 完成
* 做刷新操作 ctx.flush()
*/
public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
ctx.flush();
}
}
登录处理LoginResponseHandler:
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;
import ty.buffer.ByteBuf;
import ty.buffer.Unpooled;
import ty.channel.ChannelHandler;
import ty.channel.ChannelHandlerContext;
import ty.channel.ChannelInboundHandlerAdapter;
import ty.channel.ChannelPipeline;
/**
* netty客户端登录处理
* @author 程就人生
* @date 8月3日
* @Description
*
*/
@Service("loginResponseHandler")
@ChannelHandler.Sharable
public class LoginResponseHandler extends ChannelInboundHandlerAdapter {
private static Logger log = LoggerFactory.getLogger(LoginResponseHandler.class);
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (null == msg || !(msg instanceof byte[])) {
super.channelRead(ctx, msg);
return;
}
//对接收到的数据进行处理
byte[] data = (byte[]) msg;
int dataLength = data.length;
ByteBuf buf = Unpooled.buffer(dataLength);
buf.writeBytes(data);
int type = CharacterConvert.byteToInt(buf.readByte());
//机器编码
int deviceId = CharacterConvert.byteToInt(buf.readByte());
//校验位
int verify = CharacterConvert.byteToInt(buf.readByte());
//如果是登录操作时
if(type == ProtoInstant.LOGIN){
//计算字头 + 数据长度 + 类型 + 参数的总和
int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD, dataLength + ProtoInstant.FILED_LEN, type, deviceId);
if(verify != CharacterConvert.getLow8(sum)){
log.error("登录返回,校验位错误!");
}else{
ChannelPipeline channelPipeline = ctx.pipeline();
channelPipeline.addAfter("encoder", "heartbeat", new HeartBeatHandler());
// 移除登录响应处理器
channelPipeline.remove(this);
log.info("服务器机登录返回了!");
}
return;
}else{
super.channelRead(ctx, msg);
return;
}
}
}
心跳处理HeartBeatHandler:
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Service;
import com.example.instant.ProtoInstant;
import com.example.util.CharacterConvert;
import ty.buffer.ByteBuf;
import ty.buffer.Unpooled;
import ty.channel.ChannelHandler;
import ty.channel.ChannelHandlerContext;
import ty.channel.ChannelInboundHandlerAdapter;
/**
* netty客户端心跳处理
* @author 程就人生
* @date 8月1日
* @Description
*
*/
@ChannelHandler.Sharable
@Service("heartHandler")
public class HeartBeatHandler extends ChannelInboundHandlerAdapter {
private static Logger log = LoggerFactory.getLogger(HeartBeatHandler.class);
// 心跳的时间间隔,单位为s
private static final int HEARTBEAT_INTERVAL = 100;
// 在Handler被加入到Pipeline时,开始发送心跳
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ByteBuf buf = Unpooled.buffer(3);
//心跳
buf.writeByte(ProtoInstant.HEART_BEAT);
//机器编码
buf.writeByte(ProtoInstant.DEVICE_ID);
//校验位
int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD, 6, ProtoInstant.HEART_BEAT, ProtoInstant.DEVICE_ID);
int verify = CharacterConvert.getLow8(sum);
buf.writeByte(verify);
// 发送心跳
heartBeat(ctx, buf.array());
}
// 使用定时器,发送心跳报文
public void heartBeat(ChannelHandlerContext ctx, byte[] heartbeatMsg) {
ctx.executor().schedule(() -> {
if (ctx.channel().isActive()) {
log.info(" 发送心跳 消息 to netty服务器系统");
ctx.writeAndFlush(heartbeatMsg);
// 递归调用,发送下一次的心跳
heartBeat(ctx, heartbeatMsg);
}
}, HEARTBEAT_INTERVAL, TimeUnit.SECONDS);
}
/**
* 接受到服务器的心跳回写
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
// 判断消息实例
if (null == msg || !(msg instanceof byte[])) {
super.channelRead(ctx, msg);
return;
}
//对接收到的数据进行处理
byte[] data = (byte[]) msg;
int dataLength = data.length;
ByteBuf buf = Unpooled.buffer(dataLength);
buf.writeBytes(data);
int type = CharacterConvert.byteToInt(buf.readByte());
int deviceId = CharacterConvert.byteToInt(buf.readByte());
//如果是心跳信息时
if(type == ProtoInstant.HEART_BEAT){
int verify = CharacterConvert.byteToInt(buf.readByte());
//计算字头 + 数据长度 + 类型 + 参数的总和
int sum = CharacterConvert.sum(ProtoInstant.FIELD_HEAD, dataLength + ProtoInstant.FILED_LEN, type, deviceId);
if(verify != CharacterConvert.getLow8(sum)){
log.error("心跳包,校验位错误!");
}else{
log.info("收到回写的心跳 消息 from netty服务器系统");
}
return;
}else{
super.channelRead(ctx, msg);
}
}
}
客户端的核心编码大抵就是这些,看完这些编码是不是有些期待服务端的编码呢,服务端的编码敬请期待下一篇文章。