work queues也成为task queues,任务模型。当消息处理比较耗时的时候,可能生产消息的速度远远大于消费速度,长此以往,消息就会堆积,无法及时处理。此时,就恶意使用work模型,让多个消费者绑定到一个队列,共同消费队列的消息。消息队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
通过官方文档我们可以很直观的认识到这种模型结构,一个消费者发送多条消息至消息队列,对应的多个消费者同时工作,消费消息。
这种模型和我们之前提到的hello word直连简单模型非常相似,只是消费者从一个变成了多个,以此提高消息消费的效率。
此处省略虚拟主机用户等信息的创建、依赖以及工具类。
可参考:java实现rabbitmq简单队列模型,生产者 消费者 消息队列
创建生产者
较之前的生产者类唯一变化的是同时发布多条消息,以便观察多个消费者消费消息的情况。
public class Provider {public void send() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;try {connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();channel.queueDeclare("wuwl",true,false,false,null);for (int i = 0; i < 20; i++) {channel.basicPublish("","wuwl",null,("info index :" + i).getBytes());}}finally {if (channel !=null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}}}public static void main(String[] args) throws IOException, TimeoutException {Provider provider = new Provider();provider.send();}}
创建多个消费者
消费者一:
public class Consumer01 {public void consume() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();channel.queueDeclare("wuwl", true, false, false, null);channel.basicConsume("wuwl", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer01 consumer = new Consumer01();consumer.consume();}}
消费者二:
public class Consumer02 {public void consume() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();channel.queueDeclare("wuwl", true, false, false, null);channel.basicConsume("wuwl", true, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer02 consumer = new Consumer02();consumer.consume();}}
此时的消费者一和消费者二完全一致
启动消费者
将两个消费者先于生产者启动,等待消息的发送
启动生产者
启动生产者,发送20条消息至消息队列
分析消费者
消费者一:
消费者二:
消费者一与消费者二均匀地消费消息队列中的消息,即使两个消费者在消费消息的效率不一样,也是均匀消费。将消费者一加上线程睡眠延迟,可以发现两个消费者消费的信息数量不变,依旧是循环交替。这一点从官方的文档得到了证实。
所以,work queues无法根据消费者的能力来分配消息,只能平均分派消息,如果因为某一消费者效率低下导致消息堆积,就会比较麻烦。这个主要是有消息确认机制决定的,默认地,消费者接收到消息即确认,队列中就会把消息移除,此时,消费者是否真的消费完成了消息是未知的。比如,其中一个消费者拿到了5条待处理的消息,只处理了其中2条,服务器即发生故障,丢失的3条消息无法找回。
修改消费者消息确认机制。
消费者一:
public class Consumer01 {public void consume() throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();// 获取连接通道final Channel channel = connection.createChannel();// 每次只能消费一个消息channel.basicQos(1);channel.queueDeclare("wuwl", true, false, false, null);channel.basicConsume("wuwl", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}//参数一:确认队列中的那个消息 参数二:是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(),false);}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer01 consumer = new Consumer01();consumer.consume();}}
消费者二:
public class Consumer02 {public void consume() throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();// 获取连接通道final Channel channel = connection.createChannel();// 每次只能消费一个消息channel.basicQos(1);channel.queueDeclare("wuwl", true, false, false, null);channel.basicConsume("wuwl", false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));//参数一:确认队列中的那个消息 参数二:是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(),false);}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer02 consumer = new Consumer02();consumer.consume();}}
改动一:channel.basicQos(1)
,限制队列向消费者最大消息发送量为1
改动二:channel.basicConsume("wuwl", false, new DefaultConsumer(channel)
,第二个参数改成false,取消消息自动确认
改动三:channel.basicAck(envelope.getDeliveryTag(),false)
消息手动确认
改动四:Thread.sleep(10)
消费者一消费一条休息休眠10毫秒,太长看不出来效果,被其它消费者抢先全部消费掉
改动五:将生产者消息发送数量改成100
运行效果:
消费者一消费了100条消息中的6条
消费者消费了其余的94条
此时实现了能者多劳,队列中的消息也不会出现未被真正消费而丢失,数据安全。