700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > Spring Boot 集成 RabbitMQ 升级

Spring Boot 集成 RabbitMQ 升级

时间:2020-04-18 04:02:32

相关推荐

Spring Boot 集成 RabbitMQ 升级

为什么80%的码农都做不了架构师?>>>

参看此文档此需要先看/sdlvzg/blog/1523360,上一个文件介绍了单生产者单消息者情景,今天本文档介绍其它的使用情景。

=================单生产者-多消费者情景===================

生产者项目:

在生产者消息发送类中增加带参数的发送方法,增加之后的类内容如下:

package org.morecare.hello;import java.util.Date;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.ponent;@Componentpublic class HelloSender1 {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String sendMsg = "hello1 " + new Date();System.out.println("Sender1 : " + sendMsg);this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);}public void send(String msg) {String sendMsg = msg + new Date();System.out.println("Sender1 : " + sendMsg);this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);}}

在消息发送类中增加测试消息发送方法,增加之后的类如下:

package org.morecare.controller;import org.morecare.hello.HelloSender1;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class RabbitTest {@Autowiredprivate HelloSender1 helloSender1;//单生产者-单消费者@GetMapping("/hello")public void hello() {helloSender1.send();}//单生产者-多消费者@GetMapping("/oneToMany")public void oneToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);}}}

消费者项目:

增加一个新的消费者:

package org.morecare.hello;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "helloQueue")public class HelloReceiver2 {@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver2 : " + hello);}}

测试

通过工程启动类启动两个或工程,执行工程启动类中的Main就可以。

启动完成之后,通过浏览器访问以下地址:http://localhost:8081/oneToMany,然后去看Eclipse的控制台

消息发送端控制台会打印以下信息

消息接收端控制台会打印以下信息

通过控制台打印的信息,可以看到,一个消息生产者生产了十条消息,两个消息消费者分别消息了五条消息。

=================多生产者-多消费者情景=================

生产者项目:

在原的消息生产的基础上再增加一个新的消息生产者,内容如下:

package org.morecare.hello;import java.util.Date;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.ponent;@Componentpublic class HelloSender2 {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {String sendMsg = msg + new Date();System.out.println("Sender2 : " + sendMsg);this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);}}

在消息发送类中增加测试消息发送方法,增加之后的类如下:

package org.morecare.controller;import org.morecare.hello.HelloSender1;import org.morecare.hello.HelloSender2;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class RabbitTest {@Autowiredprivate HelloSender1 helloSender1;@Autowiredprivate HelloSender2 helloSender2;//单生产者-单消费者@GetMapping("/hello")public void hello() {helloSender1.send();}//单生产者-多消费者@GetMapping("/oneToMany")public void oneToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);}}// 多生产者-多消费者@GetMapping("/manyToMany")public void manyToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);helloSender2.send("hellomsg:"+i);}}}

消费者项目:

消费者项目不用做任何处理,原本就有两者消息消费者

测试

通过工程启动类启动两个或工程,执行工程启动类中的Main就可以。

启动完成之后,通过浏览器访问以下地址:http://localhost:8081/manyToMany,然后去看Eclipse的控制台

消息发送端控制台会打印以下信息

消息接收端控制台会打印以下信息

通过控制台打印的信息,可以看到,两个消息生产者分别生产了十条消息,两个消息消费者消费了这两十条信息。

=================实体类传输=================

springboot完美的支持对象的发送和接收,不需要格外的配置。

生产者项目:

增加实体类,(必须实现序列化接口):

package org.morecare.user;import java.io.Serializable;public class User implements Serializable {private static final long serialVersionUID = 7446624944864020387L;private String name;private String pass;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getPass() {return pass;}public void setPass(String pass) {this.pass = pass;}}

增加消息发送者

package org.morecare.user;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.ponent;@Componentpublic class UserSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {User user = new User();user.setName("hzb");user.setPass("123456789");System.out.println("user send : " + user.getName() + "/" + user.getPass());this.rabbitTemplate.convertAndSend("userQueue", user);}}

在配置类中增加一个Queue

在消息发送类中增加测试消息发送方法,增加之后的类如下:

package org.morecare.controller;import org.morecare.hello.HelloSender1;import org.morecare.hello.HelloSender2;import org.morecare.user.UserSender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class RabbitTest {@Autowiredprivate HelloSender1 helloSender1;@Autowiredprivate HelloSender2 helloSender2;@Autowiredprivate UserSender userSender;//单生产者-单消费者@GetMapping("/hello")public void hello() {helloSender1.send();}//单生产者-多消费者@GetMapping("/oneToMany")public void oneToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);}}// 多生产者-多消费者@GetMapping("/manyToMany")public void manyToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);helloSender2.send("hellomsg:"+i);}}@GetMapping("/userTest")public void userTest() {userSender.send();}}

消费者项目:

增加实体类,(必须实现序列化接口):

package org.morecare.user;import java.io.Serializable;public class User implements Serializable {private static final long serialVersionUID = 7446624944864020387L;private String name;private String pass;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getPass() {return pass;}public void setPass(String pass) {this.pass = pass;}}

增加消息消费者

package org.morecare.user;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "userQueue")public class UserReceiver {@RabbitHandlerpublic void process(User user) {System.out.println("user receive : " + user.getName() + "/" + user.getPass());}}

在配置类中增加一个Queue

package org.morecare;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabitmqConfig {@Beanpublic Queue helloQueue() {return new Queue("helloQueue");}@Beanpublic Queue userQueue() {return new Queue("userQueue");}}

测试

通过工程启动类启动两个或工程,执行工程启动类中的Main就可以。

启动完成之后,通过浏览器访问以下地址:http://localhost:8081/userTest,然后去看Eclipse的控制台

消息发送端控制台会打印以下信息

消息接收端控制台会打印以下信息

通过控制台打印的信息,生成者生产了一个实体对象消息,消费者消费了这个实体对象消息

=================topic ExChange示例=================

topic 是RabbitMQ中最灵活的一种方式,可以根据binding_key自由的绑定不同的队列

首先对topic规则配置,这里使用两个队列来测试(也就是在Application类中创建和绑定的topic.message和topic.messages两个队列),其中topic.message的bindting_key为

“topic.message”,topic.messages的binding_key为“topic.#”;

生产者项目:

增加消息生产者

package org.morecare.topic;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.ponent;@Componentpublic class TopicSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String msg1 = "I am topic.mesaage msg======";System.out.println("sender1 : " + msg1);this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);String msg2 = "I am topic.mesaages msg########";System.out.println("sender2 : " + msg2);this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);}}

在消息发送类中增加测试消息发送方法,增加之后的类如下:

package org.morecare.controller;import org.morecare.hello.HelloSender1;import org.morecare.hello.HelloSender2;import org.morecare.topic.TopicSender;import org.morecare.user.UserSender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class RabbitTest {@Autowiredprivate HelloSender1 helloSender1;@Autowiredprivate HelloSender2 helloSender2;@Autowiredprivate UserSender userSender;@Autowiredprivate TopicSender topicSender;//单生产者-单消费者@GetMapping("/hello")public void hello() {helloSender1.send();}//单生产者-多消费者@GetMapping("/oneToMany")public void oneToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);}}// 多生产者-多消费者@GetMapping("/manyToMany")public void manyToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);helloSender2.send("hellomsg:"+i);}}//Object@GetMapping("/userTest")public void userTest() {userSender.send();}//Topic@GetMapping("/topicTest")public void topicTest() {topicSender.send();}}

消费者项目:

增加消息消费者1

package org.morecare.topic;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "topic.message")public class topicMessageReceiver1 {@RabbitHandlerpublic void process(String msg) {System.out.println("topicMessageReceiver1 : " +msg);}}

增加消息消费者2

package org.morecare.topic;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "topic.messages")public class topicMessageReceiver2 {@RabbitHandlerpublic void process(String msg) {System.out.println("topicMessageReceiver2 : " +msg);}}

在配置类中增加一个Queue配置

package org.morecare;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabitmqTopicConfig {@Beanpublic Queue queueMessage() {return new Queue("topic.message");}@Beanpublic Queue queueMessages() {return new Queue("topic.messages");}@BeanTopicExchange exchange() {return new TopicExchange("exchange");}/*** 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配* @param queueMessage* @param exchange* @return*/@BeanBinding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");}/*** 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配* @param queueMessage* @param exchange* @return*/@BeanBinding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");}}

测试

通过工程启动类启动两个或工程,执行工程启动类中的Main就可以。

启动完成之后,通过浏览器访问以下地址:http://localhost:8081/topicTest,然后去看Eclipse的控制台

消息发送端控制台会打印以下信息

消息接收端控制台会打印以下信息

通过控制台打印的信息,可以看出两个消息生产者生产了两条消息,对两个消息消费者消费了三次。

sender1发送的消息,routing_key是“topic.message”,所以exchange里面的绑定的binding_key是“topic.message”,topic.#都符合路由规则;所以sender1发送的消息,两个队列都能接收到;

sender2发送的消息,routing_key是“topic.messages”,所以exchange里面的绑定的binding_key只有topic.#都符合路由规则;所以sender2发送的消息只有队列topic.messages能收到。

=================fanout ExChange示例=================

Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的所有队列都收到这个消息。

这里使用三个队列来测试(也就是在Application类中创建和绑定的fanout.A、fanout.B、fanout.C)这三个队列都和Application中创建的fanoutExchange转发器绑定。

生产者项目:

增加消息生产者

package org.morecare.fanout;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.ponent;@Componentpublic class FanoutSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String msgString="fanoutSender :hello i am lvgang";System.out.println(msgString);this.rabbitTemplate.convertAndSend("fanoutExchange","abcd.ee", msgString);}}

在消息发送类中增加测试消息发送方法,增加之后的类如下:

package org.morecare.controller;import org.morecare.fanout.FanoutSender;import org.morecare.hello.HelloSender1;import org.morecare.hello.HelloSender2;import org.morecare.topic.TopicSender;import org.morecare.user.UserSender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class RabbitTest {@Autowiredprivate HelloSender1 helloSender1;@Autowiredprivate HelloSender2 helloSender2;@Autowiredprivate UserSender userSender;@Autowiredprivate TopicSender topicSender;@Autowiredprivate FanoutSender fanoutSender;//单生产者-单消费者@GetMapping("/hello")public void hello() {helloSender1.send();}//单生产者-多消费者@GetMapping("/oneToMany")public void oneToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);}}// 多生产者-多消费者@GetMapping("/manyToMany")public void manyToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);helloSender2.send("hellomsg:"+i);}}//Object@GetMapping("/userTest")public void userTest() {userSender.send();}//Topic@GetMapping("/topicTest")public void topicTest() {topicSender.send();}//fanout exchange类型rabbitmq测试@GetMapping("/fanoutTest")public void fanoutTest() {fanoutSender.send();}}

消费者项目:

增加消息消费者1

package org.morecare.fanout;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "fanout.A")public class FanoutReceiverA {@RabbitHandlerpublic void process(String msg) {System.out.println("FanoutReceiverA : " + msg);}}

增加消息消费者2

package org.morecare.fanout;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "fanout.B")public class FanoutReceiverB {@RabbitHandlerpublic void process(String msg) {System.out.println("FanoutReceiverB : " + msg);}}

增加消息消费者3

package org.morecare.fanout;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "fanout.C")public class FanoutReceiverC {@RabbitHandlerpublic void process(String msg) {System.out.println("FanoutReceiverC : " + msg);}}

在配置类中增加一个Queue配置

package org.morecare;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabitmqFanoutConfig {@Beanpublic Queue AMessage() {return new Queue("fanout.A");}@Beanpublic Queue BMessage() {return new Queue("fanout.B");}@Beanpublic Queue CMessage() {return new Queue("fanout.C");}@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@BeanBinding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {return BindingBuilder.bind(AMessage).to(fanoutExchange);}@BeanBinding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(BMessage).to(fanoutExchange);}@BeanBinding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(CMessage).to(fanoutExchange);}}

测试

通过工程启动类启动两个或工程,执行工程启动类中的Main就可以。

启动完成之后,通过浏览器访问以下地址:http://localhost:8081/fanoutTest,然后去看Eclipse的控制台

消息发送端控制台会打印以下信息

消息接收端控制台会打印以下信息

通过控制台打印的信息,可以看出1个消息生产者生产了1条消息,被三个消费者每个消费了一次。增加在消息发送时,指定了routing_key为"abcd.ee"不存在,但是所有接收者都接受到了消息。

注这种广播的方式必须消息消费者先启动,如果后启动消息消费者是收到之前的消息的。

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。