700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > RabbitMQ之消息确认机制ACK

RabbitMQ之消息确认机制ACK

时间:2023-03-09 10:20:05

相关推荐

RabbitMQ之消息确认机制ACK

消息确认机制(ack)

队列分配消息给监听消费者时,该消息处于未确认状态,不会被删除;当接收到消费者的确认回复才会将消息移除。

RabbitMQ默认的消息确认机制是:自动确认的 。

修改为手动确认模式,然后不手动确认看看结果

在application.yml中

spring:rabbitmq:port: 5672host: 127.0.0.1username: guestpassword: guestlistener:simple:prefetch: 1acknowledge-mode: manual # 开启手动确认,自动是auto

package com.yzm.rabbitmq_02.config;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.QueueBuilder;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabbitConfig {public static final String ACK_QUEUE = "ack_queue";/*** 消息队列* durable:设置是否持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息* exclusive:设置是否排他。如果一个队列被声明为排他队列,该队列仅对首次声明它的连接可见,并在连接断开时自动删除* autoDelete:设置是否自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除*/@Beanpublic Queue queue() {return QueueBuilder.durable(ACK_QUEUE).build();}}

package com.yzm.rabbitmq_02.sender;import com.yzm.rabbitmq_02.config.RabbitConfig;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RequestParam;import org.springframework.web.bind.annotation.RestController;/*** 消息发布*/@RestControllerpublic class AckSender {private final AmqpTemplate template;public AckSender(AmqpTemplate template) {this.template = template;}@GetMapping("/send")public void send(@RequestParam(value = "message", required = false, defaultValue = "Hello World") String message) {for (int i = 1; i <= 10; i++) {String msg = message + " ..." + i;System.out.println(" [ 生产者 ] Sent ==> '" + msg + "'");template.convertAndSend(RabbitConfig.ACK_QUEUE, msg);}}}

package com.yzm.rabbitmq_02.receiver;import com.yzm.rabbitmq_02.config.RabbitConfig;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;/*** 消息监听*/@Componentpublic class AckReceiver {private int count1 = 1;private int count2 = 1;@RabbitListener(queues = RabbitConfig.ACK_QUEUE)public void receive1(Message message) throws InterruptedException {Thread.sleep(200);System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);}@RabbitListener(queues = RabbitConfig.ACK_QUEUE)public void receive2(Message message) throws InterruptedException {Thread.sleep(1000);System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);}}

运行结果:

消费者1号、2号分别拿到一条消息进行消费,但没有确认,处于阻塞状态,所以队列不会移除这两条消息,同时设置了prefetch=1,在消费者未确认之前不会重新推送消息给消费者

停止程序,发现2条未确认的消息会回到Ready里面等待重新消费

再次重启,再次消费2条消息,但仍未确认

访问/send,再次发布消息,消息堆积

好了,来看看如何手动确认吧。修改消费者

package com.yzm.rabbitmq_02.receiver;import com.rabbitmq.client.Channel;import com.yzm.rabbitmq_02.config.RabbitConfig;import org.springframework.amqp.core.Message;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.handler.annotation.Header;import org.springframework.messaging.handler.annotation.Headers;import org.ponent;import java.io.IOException;import java.util.Map;/*** 消息监听*/@Componentpublic class AckReceiver {private int count1 = 1;private int count2 = 1;private int count3 = 1;@RabbitListener(queues = RabbitConfig.ACK_QUEUE)public void receive1(Message message, Channel channel) throws IOException, InterruptedException {Thread.sleep(200);System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);// 确认消息// 第一个参数,交付标签,相当于消息ID 64位的长整数(从1开始递增)// 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}@RabbitListener(queues = RabbitConfig.ACK_QUEUE)public void receive2(Message message, Channel channel,@Headers Map<String, Object> map) throws IOException, InterruptedException {Thread.sleep(600);System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);// 确认消息channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);}@RabbitListener(queues = RabbitConfig.ACK_QUEUE)public void receive3(Message message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException, InterruptedException {Thread.sleep(1000);System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'");System.out.println(" [ 消费者@3号 ] 处理消息数:" + count3++);// 确认消息channel.basicAck(deliveryTag, false);}}

刚启动,就把前两次积累的消息先被消费完

接着发布消息

手动确认通过调用方法实现

basicAck(long deliveryTag, boolean multiple)

deliveryTag:交付标签,相当于消息ID 64位的长整数(从1开始递增)

multiple:false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签

能手动确认,同样也可以手动拒绝,修改消费者

@Componentpublic class AckReceiver {private int count1 = 1;private int count2 = 1;private int count3 = 1;// @RabbitListener(queues = RabbitConfig.ACK_QUEUE)public void receive1(Message message, Channel channel) throws IOException, InterruptedException {Thread.sleep(200);System.out.println(" [ 消费者@1号 ] Received ==> '" + new String(message.getBody()) + "'");System.out.println(" [ 消费者@1号 ] 处理消息数:" + count1++);// 确认消息// 第一个参数,交付标签,相当于消息ID 64位的长整数(从1开始递增)// 第二个参数,false表示仅确认提供的交付标签;true表示批量确认所有消息(消息ID小于自身的ID),包括提供的交付标签channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);}// @RabbitListener(queues = RabbitConfig.ACK_QUEUE)public void receive2(Message message, Channel channel,@Headers Map<String, Object> map) throws IOException, InterruptedException {Thread.sleep(600);System.out.println(" [ 消费者@2号 ] Received ==> '" + new String(message.getBody()) + "'");System.out.println(" [ 消费者@2号 ] 处理消息数:" + count2++);// 确认消息channel.basicAck((Long) map.get(AmqpHeaders.DELIVERY_TAG), false);}// @RabbitListener(queues = RabbitConfig.ACK_QUEUE)public void receive3(Message message, Channel channel,@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws IOException, InterruptedException {Thread.sleep(1000);System.out.println(" [ 消费者@3号 ] Received ==> '" + new String(message.getBody()) + "'");System.out.println(" [ 消费者@3号 ] 处理消息数:" + count3++);// 确认消息channel.basicAck(deliveryTag, false);}@RabbitListener(queues = RabbitConfig.ACK_QUEUE)public void receive4(Message message, Channel channel) throws IOException, InterruptedException {Thread.sleep(200);System.out.println(" [ 消费者@4号 ] Received ==> '" + new String(message.getBody()) + "'");System.out.println(" [ 消费者@4号 ] 消息被我拒绝了:" + count1++);// 拒绝消息方式一// 第一个参数,交付标签// 第二个参数,false表示仅拒绝提供的交付标签;true表示批量拒绝所有消息,包括提供的交付标签// 第三个参数,false表示直接丢弃消息,true表示重新排队//channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);// 拒绝消息方式二// 第一个参数,交付标签// 第二个参数,false表示直接丢弃消息,true表示重新排队// 跟basicNack的区别就是始终只拒绝提供的交付标签channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);}}

运行结果:

channel.basicReject(message.getMessageProperties().getDeliveryTag(), true);

这里是拒绝后,重新进入队列,所以消费的总是第一条消息并且循环不停

停止程序后,队列仍然是10条消息

channel.basicReject(message.getMessageProperties().getDeliveryTag(), false);

改成false,拒绝后直接丢弃

重启后:

总结一下 手动确认模式的各种情况

未确认:什么也不用写,消息不会移除,重复消费,积攒越来越多

确认:channel.basicAck();确认后,消息从队列中移除

拒绝:channel.basicNack()或channel.basicReject();拒绝后,消息先从队列中移除,然后可以选择重新排队,或者直接丢弃(丢弃还有一种选择,就是加入到死信队列中,用于追踪问题)

相关链接

首页

上一篇:快速入门

下一篇:交换机

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。