RabbitMQ的消息可靠性处理
1.消息不可靠的情况
如果consumer未启动,而publisher发送了消息,则消息会丢失,这种情况下,消息不可靠
consumer先启动,创建queue后,publisher发送消息后consumer正常接收消费,但consumer在消费时发生异常或宕机,如果RabbitMQ一旦发送给consumer后自动删除消息,则消息同样丢失,消息也不可靠
消息被错误消费的情况,消息也不可靠
2.解决方案
1.消息丢失
消息持久化处理
在@Queue中设置autoDelete属性为false(当consumer链接断开时,不自动删除队列)在@Exchange中设置autoDelete属性为false(当交换器绑定的队列不再使用时,不自动删除交换器)
2.消息被错误消费
确认机制处理ACK【acknowledge】 - consumer从RabbitMQ接收消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到确认反馈后才将消息从队列中删除,如果consumer在处理消息时出现网络不稳定、服务器异常等现象,就不会反馈给RabbitMQ,RabbitMQ会认为消息没有被正常消费,就会将消息重新放入队列中
在consumer非集群环境下,RabbitMQ没有收到确认消息时,RabbitMQ将会永久保存消息
在consumer集群环境下,RabbitMQ没有收到确认消息时,会立即将消息推送给集群中的其他consumer
消息确认机制默认处于开启状态,且不推荐关闭
在消息确认机制开启状态下,如果consumer没有处理消息确认,那么所有的consumer都没有正常反馈确认消息,并退出监听状态,消息会永久保存,并处于锁定状态,直到消息都被正常消费为止。此时若publisher持续发送消息,消息就会堆积,持续占用RabbitMQ所在服务器的内存,导致内存泄漏问题
编码异常处理:通过编码处理异常的方式,保证消息确认机制的正常执行,同样可以有效避免消息的重复消费
在consumer中catch异常后,不直接丢弃消息,不直接反馈ACK确认消息,而是做异常处理。针对该抛的异常,还得抛,以此保证ACK机制的正常执行;对于未处理成功的消息,在重新发送给MQ(如:在catch代码中,本地逻辑的重试,使用定时线程池重复执行3次任务,3次未成功,可能是consumer出现了严重问题,需要修改代码,提升版本或打补丁之类的处理)
RabbitMQ支持通过配置文件配置重试,当重试次数使用完毕,无论是否收到确认反馈,都会删除消息,避免内存泄漏的可能
# 开启重试机制spring.rabbitmq.listener.retry.enabled=true# 指定重试次数(默认3次)spring.rabbitmq.listener.retry.max_attempts=5
常用MQ对比
社区活跃度:RabbitMQ > ActiveMQ = RocketMQ > Kafka
消息持久化:除ZeroMQ外都支持
高并发:RabbitMQ = Kafka > RocketMQ > ActiveMQ
吞吐量:RabbitMQ = Kafka > RocketMQ > ActiveMQ
综合技术(可靠性、路由、集群、事务、高可用队列、消息可靠排序、持久化、可视化管理工具等):RabbitMQ = Kafka > RocketMQ > ActiveMQ
建议Kafka针对日志处理,其他使用RabbitMQ
Spring Cloud Bus
用于消息广播,集成了RabbitMQ和Kafka等消息代理,当有数据变更时,会通过消息中间件使用消息广播的方式通知所有的微服务节点同步更新数据
1.创建gitee仓库
创建consumer与provider的配置文件
2.创建父工程
引入依赖
创建实体类
创建服务接口
3.创建config子工程
创建配置文件
创建启动类
4.创建provider子工程
5.创建consumer子工程
1.基于config client实现全局热刷新
1.引入依赖
spring-cloud-starter-bus-amqpspring-cloud-starter-netflix-hystrixspring-cloud-starter-actuator
2.添加配置
# 配置RabbitMQspring.rabbitmq.host=192.168.54.121spring.rabbitmq.port=5672spring.rabbitmq.username=adminspring.rabbitmq.password=123spring.rabbitmq.virtual-host=/# 配置busspring.cloud.bus.enabled=truespring.cloud.bus.trace.enabled=true# 开启访问权限management.endpoints.web,exposure.include=*
3.启动类修改
添加@EnableCircuitBreaker
4.修改刷新路径
/actuator/bus-refresh
2.基于config server实现全局热刷新
这种实现方案在设计上更加完美,将热刷新逻辑与具体的服务应用解耦
Spring Cloud Stream
Spring Cloud Stream是Spring数据集成的一个组成部件,为开发人员提供更加简易的与外部系统连接的方式,可以让微服务开发进一步解耦,让服务开发人员将注意力集中在业务逻辑的处理上
Spring Cloud Stream可以针对MQ做读写操作,对消息中间件提供了进一步的封装,可以做到代码层面无感知的与中间件交互,甚至可以做到动态切换中间件组件(类似于JDBC对于DBMS,代码统一,不同数据库只需修改配置文件即可)
Spring Cloud Stream支持的中间件组件:暂时只支持RabbitMQ(使用topic交换器)和Kafka;不支持ActiveMQ(高并发性能较差)、RocketMQ(闭源)
1.Spring Cloud Stream的使用
1.创建父工程并引入依赖
spring-cloud-starter-stream-rabbitspring-boot-starter-webspring-cloud-starter-netflix-erueka-client
2.创建commons子模块
1.创建消费类
public class MyMessage implements Serializable{}
3.创建publisher子模块
1.引入依赖
commons
2.创建一个消息提供接口
public interface BasePublisher{//获取RabbitMQ通讯信道的方法@Output("base-ex")//信道对应的交换器名称SubscribableChannel getSendChannel();}
3.创建一个消息发送服务接口
public interface BasePublisherService{void sendMessage(Integer id,String remark,Object Data);}
4.创建消息发送服务的实现类
public class BasePublisherServiceImpl implements BasePublisherService{@Autowiredprivate BasePublisher publisher;@Overridepublic void sendMessage(Integer id,String remark,Object data){//封装自定义消息对象MyMessage myMessage=new MyMessage(id,remark,data);//加载消息体,构建消息对象Message message=MessageBuilder.withPayload(myMessage).build();//获取channelSubscribableChannel channel = this.publisher.getSendChannel();//发送消息channel.send(message);}}
5.创建一个控制器
@RestControllerpublic class PublisherController{@Autowiredprivate BasePublisherService service;@RequestMapping("/base")public String basePublisher(Integer id,String remark,Object data){this.service.sendMessage(id,remark,data);return "ok";}}
6.创建启动类
@SpringBootApplication@EnableBinding(value={BasePublisher.class})public class PublisherApp{public static void main(String[] args){SpringApplication.run(PublisherApp.class,args);}}
7.创建配置文件
server.port=8081spring.application.name=my-rabbitmq-publishererueka.client.serviceUrl.defaultZone=spring.rabbitmq.host=http://192.168.54.121spring.rabbitmq.port=5671spring.rabbitmq.virtual-host=/spring.rabbitmq.username=adminspring.rabbitmq.password=123
4.创建consumer子模块
引入依赖
commons
创建一个信道接口
public interface BaseConsumer{//获取RabbitMQ的链接工厂//通过链接工厂获取channel信道@Input("base-ex")//交换器名称SubscribableChannel getChannel();//依托信道建立监听机制//自定义处理消息的具体业务逻辑}
创建一个消费者接口
public interface BaseMessageService{//消费消息的方法void recive(MyMessage message);}
创建消费者接口的实现类
@Service@EnableBinding(value={BaseConsumer.class})public class BaseMessageServiceImpl implements BaseMessageService{//消费消息的方法@Ovrride@StreamListener("base-ex")//监听的交换器名称public void recive(MyMessage message){}}
创建配置文件
# 开放端口server.port=8080# 引用名称spring.application.name=my-rabbitmq-consumer# 注册中心地址eureka.client.serviceUrl.defaultZone=http://192.168.54.100:8761/eureka/,http://192.168.54.100:8762/eureka/# rabbitmq相关spring.rabbitmq.host=http://192.168.54.121spring.rabbitmq.port=5671spring.rabbitmq.virtual-host=/spring.rabbitmq.username=adminspring.rabbitmq.password=123
创建启动类
@SpringBottApplication@EnableBinding(value={BaseConsumer.class})public class ConsumerApp{public static void main(String[] args){SpringApplication.run(ConsumerApp.class,args);}}
Spring Cloud Stream实现消息分组
consumer集群分组后,轮询消费消息
不同队列,广播发送消息
1.publisher子模块
1.修改配置
spring.cloud.steam.binding.group-publisher.destination=exchange-group
2.创建消息发送信道接口
public interface GroupPublisher{@Output("group-publisher")Subscr}
3.创建消息发送接口
public interface GroupPublisherService{}
4.创建消息发送实现类
@Service@EnableBinding(value={GroupPubliser.class})public class GroupPu{private GroupPublisher publisher;@Overridepublic void sendMessage(Integer id,String remark,Object data){MyMessage myMessage=new MyMessage(id,remark,data);Message message = MessageBuilder.withPayload(myMessage).build();publisher.getChannel().send(message);}}
5.添加控制方法
2.在consumer中添加配置
1.添加配置
# 配置目的地(交换器的名称)# group-ex为@Input注解中的keyspring.cloud.stream.bindings.group-ex.destination=exchange-group# 配置分组(队列的名称)spring.cloud.stream.bindings.group-ex.group=queue-group-1# 创建的队列为持久化队列,队列名称为destination.group
2.创建分组接口
public interface GroupConsumer{@Input("group-ex")SubscribableChannel getInputChannel();}
3.创建消费接口
public interface GroupMessageService{void onMessage();}
4.创建消费接口的实现类
@Service@EnableBinding(value={GroupConsumer.class})public class GroupMessageServiceImpl implements GroupMessageService{@Override@StreamListner("group-ex")public void onMessage(MyMessage message){}}
Spring Cloud Stream实现消息分区
消息分区是指相同消息发送给同一个consumer节点处理,避免重复消息被不同consumer处理的情况
创建的是持久化队列,队列名称为destination.group-instance-index
路由键是destination-instance-index
1.Publisher
修改配置文件
# 目的地spring.cloud.stream.bindings.partition-ex-publisher.destination=exchange-partition-publisherspring.cloud.stream.bindings.partition-ex.group=partition-group# 分区数量(队列有几个consumer)spring.cloud.stream.bindingspartition-ex-publisher.publisher-count=2# 配置分区的检查方式,根据什么数据实现分区控制,payload代表根据消息内容做分区控制# payload判断依据是消息体对象的hashCode()和equals(),记录payload是否重复的方式是ConcurrentHashMap,如果消息几多,且服务运行为7*24长效服务,不推荐分区,因为ConcurrentHashMap会占用内存,占用时间过长,可能造成内存泄漏spring.cloud.stream.bindings.partition-ex.producer.partition-ey-expression=payload
2.consumer
修改配置文件
# 目的地spring.cloud.stream.bindings.partition-ex.destination=exchange-partitionspring.cloud.stream.bindings.partition-ex.group=partition-groupspring.cloud.stream.bindings.partition-ex.consumer.partitioned=true# 分区数量(队列有几个consumer)spring.cloud.stream.instance-count=2# 分区标识spring.cloud.stream.instance-index=0