700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > java实现rabbitmq任务模型(work queues) 生产者 消费者 消息队列 能者多劳

java实现rabbitmq任务模型(work queues) 生产者 消费者 消息队列 能者多劳

时间:2022-12-26 19:11:37

相关推荐

java实现rabbitmq任务模型(work queues)  生产者  消费者  消息队列  能者多劳

work queues也成为task queues,任务模型。当消息处理比较耗时的时候,可能生产消息的速度远远大于消费速度,长此以往,消息就会堆积,无法及时处理。此时,就恶意使用work模型,让多个消费者绑定到一个队列,共同消费队列的消息。消息队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。

通过官方文档我们可以很直观的认识到这种模型结构,一个消费者发送多条消息至消息队列,对应的多个消费者同时工作,消费消息。

这种模型和我们之前提到的hello word直连简单模型非常相似,只是消费者从一个变成了多个,以此提高消息消费的效率。

此处省略虚拟主机用户等信息的创建、依赖以及工具类。

可参考:java实现rabbitmq简单队列模型,生产者 消费者 消息队列

创建生产者

较之前的生产者类唯一变化的是同时发布多条消息,以便观察多个消费者消费消息的情况。

public class Provider {public void send() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;try {connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();channel.queueDeclare("wuwl",true,false,false,null);for (int i = 0; i < 20; i++) {channel.basicPublish("","wuwl",null,("info index :" + i).getBytes());}}finally {if (channel !=null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}}}public static void main(String[] args) throws IOException, TimeoutException {Provider provider = new Provider();provider.send();}}

创建多个消费者

消费者一:

public class Consumer01 {public void consume() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();channel.queueDeclare("wuwl", true, false, false, null);channel.basicConsume("wuwl", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer01 consumer = new Consumer01();consumer.consume();}}

消费者二:

public class Consumer02 {public void consume() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();channel.queueDeclare("wuwl", true, false, false, null);channel.basicConsume("wuwl", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer02 consumer = new Consumer02();consumer.consume();}}

此时的消费者一和消费者二完全一致

启动消费者

将两个消费者先于生产者启动,等待消息的发送

启动生产者

启动生产者,发送20条消息至消息队列

分析消费者

消费者一:

消费者二:

消费者一与消费者二均匀地消费消息队列中的消息,即使两个消费者在消费消息的效率不一样,也是均匀消费。将消费者一加上线程睡眠延迟,可以发现两个消费者消费的信息数量不变,依旧是循环交替。这一点从官方的文档得到了证实。

所以,work queues无法根据消费者的能力来分配消息,只能平均分派消息,如果因为某一消费者效率低下导致消息堆积,就会比较麻烦。这个主要是有消息确认机制决定的,默认地,消费者接收到消息即确认,队列中就会把消息移除,此时,消费者是否真的消费完成了消息是未知的。比如,其中一个消费者拿到了5条待处理的消息,只处理了其中2条,服务器即发生故障,丢失的3条消息无法找回。

修改消费者消息确认机制。

消费者一:

public class Consumer01 {public void consume() throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();// 获取连接通道final Channel channel = connection.createChannel();// 每次只能消费一个消息channel.basicQos(1);channel.queueDeclare("wuwl", true, false, false, null);channel.basicConsume("wuwl", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}//参数一:确认队列中的那个消息 参数二:是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(),false);}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer01 consumer = new Consumer01();consumer.consume();}}

消费者二:

public class Consumer02 {public void consume() throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();// 获取连接通道final Channel channel = connection.createChannel();// 每次只能消费一个消息channel.basicQos(1);channel.queueDeclare("wuwl", true, false, false, null);channel.basicConsume("wuwl", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));//参数一:确认队列中的那个消息 参数二:是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(),false);}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer02 consumer = new Consumer02();consumer.consume();}}

改动一:channel.basicQos(1),限制队列向消费者最大消息发送量为1

改动二:channel.basicConsume("wuwl", false, new DefaultConsumer(channel),第二个参数改成false,取消消息自动确认

改动三:channel.basicAck(envelope.getDeliveryTag(),false)消息手动确认

改动四:Thread.sleep(10)消费者一消费一条休息休眠10毫秒,太长看不出来效果,被其它消费者抢先全部消费掉

改动五:将生产者消息发送数量改成100

运行效果:

消费者一消费了100条消息中的6条

消费者消费了其余的94条

此时实现了能者多劳,队列中的消息也不会出现未被真正消费而丢失,数据安全。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。