700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > java实现rabbitmq发布/订阅模型(Publish/Subscribe queues) 生产者 消费者 交换机 消息队列

java实现rabbitmq发布/订阅模型(Publish/Subscribe queues) 生产者 消费者 交换机 消息队列

时间:2022-09-20 00:01:48

相关推荐

java实现rabbitmq发布/订阅模型(Publish/Subscribe queues)  生产者  消费者  交换机  消息队列

发布/订阅模型又称扇出模型,或者是广播模型,可以有多个消费者,每个消费者有自己的队列,每个队列都要绑定到交换机,生产者发送的消息只需要发送到交换机,再由交换机决定要发送到哪些队列,生产者无法自行决定。交换机会把消息发送到绑定过的所有队列,实现一对多,一条消息被多个消费者消费。

可以看到,这种模型需要用到交换机模块,我们在后台管理界面可以看到许多交换机可供使用,当然,也可以自己声明需要的交换机。

每个虚拟主机默认生成了多个类型的交换机,这里我们选择一个fanout类型的,名为amqp.fanout的交换机测试。

生产者

public class Provider {public void send() throws IOException, TimeoutException {Connection connection = null;Channel channel = null;try {connection = ConnectionUtils.getConnection();// 获取连接通道channel = connection.createChannel();// 定义通道对应的交换机 参数一:交换机名称 参数二:教会及类型 fanout 广播模型channel.exchangeDeclare("amqp.fanout","fanout");// 发送消息channel.basicPublish("amqp.fanout","",null,("fanout message " + System.currentTimeMillis()).getBytes());}finally {if (channel !=null && channel.isOpen()) {channel.close();}if (connection != null && connection.isOpen()) {connection.close();}}}public static void main(String[] args) throws IOException, TimeoutException {Provider provider = new Provider();provider.send();}}

通过连接获取通道后,使用channel.exchangeDeclare(“amqp.fanout”,“fanout”);声明交换机,如果交换机不存在,rabbitmq会自动创建,发送消息时指定exchange为amqp.fanout,第二个参数为空,使用临时队列。

消费者

public class Consumer01 {public void consume() throws IOException, TimeoutException {Connection connection = ConnectionUtils.getConnection();// 获取连接通道final Channel channel = connection.createChannel();// 绑定交换机channel.exchangeDeclare("amqp.fanout","fanout");//创建临时队列String queueName = channel.queueDeclare().getQueue();// 绑定交换机和队列channel.queueBind(queueName,"amqp.fanout","");// 每次只能消费一个消息channel.basicQos(1);// 消费消息channel.basicConsume(queueName, false, new DefaultConsumer(channel) {@Overridepublic void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {System.out.println("消费消息:" + new String(body));try {Thread.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}//参数一:确认队列中的那个消息 参数二:是否开启多个消息同时确认channel.basicAck(envelope.getDeliveryTag(),false);}});}public static void main(String[] args) throws IOException, TimeoutException {Consumer01 consumer = new Consumer01();consumer.consume();}}

此处为了减少篇幅,读者可将Consumer01的代码复制生成Consumer02或多个消费者类

测试

启动所有消费者,通过临时队列监听交换机。此时,运行生产者发布消息,多个消费者可同时接收到交换机发送的消息。

消费者一:

消费者二:

多个消费者同时接收到了相同的消息,再运行一次生产者也一样,消息已广播的形式发送至绑定到交换机的多个消费者,其中,消费者与交换机通过临时队列连接。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。