为什么80%的码农都做不了架构师?>>>
参看此文档此需要先看/sdlvzg/blog/1523360,上一个文件介绍了单生产者单消息者情景,今天本文档介绍其它的使用情景。
=================单生产者-多消费者情景===================
生产者项目:
在生产者消息发送类中增加带参数的发送方法,增加之后的类内容如下:
package org.morecare.hello;import java.util.Date;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.ponent;@Componentpublic class HelloSender1 {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String sendMsg = "hello1 " + new Date();System.out.println("Sender1 : " + sendMsg);this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);}public void send(String msg) {String sendMsg = msg + new Date();System.out.println("Sender1 : " + sendMsg);this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);}}
在消息发送类中增加测试消息发送方法,增加之后的类如下:
package org.morecare.controller;import org.morecare.hello.HelloSender1;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class RabbitTest {@Autowiredprivate HelloSender1 helloSender1;//单生产者-单消费者@GetMapping("/hello")public void hello() {helloSender1.send();}//单生产者-多消费者@GetMapping("/oneToMany")public void oneToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);}}}
消费者项目:
增加一个新的消费者:
package org.morecare.hello;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "helloQueue")public class HelloReceiver2 {@RabbitHandlerpublic void process(String hello) {System.out.println("Receiver2 : " + hello);}}
测试
通过工程启动类启动两个或工程,执行工程启动类中的Main就可以。
启动完成之后,通过浏览器访问以下地址:http://localhost:8081/oneToMany,然后去看Eclipse的控制台
消息发送端控制台会打印以下信息
消息接收端控制台会打印以下信息
通过控制台打印的信息,可以看到,一个消息生产者生产了十条消息,两个消息消费者分别消息了五条消息。
=================多生产者-多消费者情景=================
生产者项目:
在原的消息生产的基础上再增加一个新的消息生产者,内容如下:
package org.morecare.hello;import java.util.Date;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.ponent;@Componentpublic class HelloSender2 {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send(String msg) {String sendMsg = msg + new Date();System.out.println("Sender2 : " + sendMsg);this.rabbitTemplate.convertAndSend("helloQueue", sendMsg);}}
在消息发送类中增加测试消息发送方法,增加之后的类如下:
package org.morecare.controller;import org.morecare.hello.HelloSender1;import org.morecare.hello.HelloSender2;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class RabbitTest {@Autowiredprivate HelloSender1 helloSender1;@Autowiredprivate HelloSender2 helloSender2;//单生产者-单消费者@GetMapping("/hello")public void hello() {helloSender1.send();}//单生产者-多消费者@GetMapping("/oneToMany")public void oneToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);}}// 多生产者-多消费者@GetMapping("/manyToMany")public void manyToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);helloSender2.send("hellomsg:"+i);}}}
消费者项目:
消费者项目不用做任何处理,原本就有两者消息消费者
测试
通过工程启动类启动两个或工程,执行工程启动类中的Main就可以。
启动完成之后,通过浏览器访问以下地址:http://localhost:8081/manyToMany,然后去看Eclipse的控制台
消息发送端控制台会打印以下信息
消息接收端控制台会打印以下信息
通过控制台打印的信息,可以看到,两个消息生产者分别生产了十条消息,两个消息消费者消费了这两十条信息。
=================实体类传输=================
springboot完美的支持对象的发送和接收,不需要格外的配置。
生产者项目:
增加实体类,(必须实现序列化接口):
package org.morecare.user;import java.io.Serializable;public class User implements Serializable {private static final long serialVersionUID = 7446624944864020387L;private String name;private String pass;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getPass() {return pass;}public void setPass(String pass) {this.pass = pass;}}
增加消息发送者
package org.morecare.user;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.ponent;@Componentpublic class UserSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {User user = new User();user.setName("hzb");user.setPass("123456789");System.out.println("user send : " + user.getName() + "/" + user.getPass());this.rabbitTemplate.convertAndSend("userQueue", user);}}
在配置类中增加一个Queue
在消息发送类中增加测试消息发送方法,增加之后的类如下:
package org.morecare.controller;import org.morecare.hello.HelloSender1;import org.morecare.hello.HelloSender2;import org.morecare.user.UserSender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class RabbitTest {@Autowiredprivate HelloSender1 helloSender1;@Autowiredprivate HelloSender2 helloSender2;@Autowiredprivate UserSender userSender;//单生产者-单消费者@GetMapping("/hello")public void hello() {helloSender1.send();}//单生产者-多消费者@GetMapping("/oneToMany")public void oneToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);}}// 多生产者-多消费者@GetMapping("/manyToMany")public void manyToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);helloSender2.send("hellomsg:"+i);}}@GetMapping("/userTest")public void userTest() {userSender.send();}}
消费者项目:
增加实体类,(必须实现序列化接口):
package org.morecare.user;import java.io.Serializable;public class User implements Serializable {private static final long serialVersionUID = 7446624944864020387L;private String name;private String pass;public String getName() {return name;}public void setName(String name) {this.name = name;}public String getPass() {return pass;}public void setPass(String pass) {this.pass = pass;}}
增加消息消费者
package org.morecare.user;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "userQueue")public class UserReceiver {@RabbitHandlerpublic void process(User user) {System.out.println("user receive : " + user.getName() + "/" + user.getPass());}}
在配置类中增加一个Queue
package org.morecare;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabitmqConfig {@Beanpublic Queue helloQueue() {return new Queue("helloQueue");}@Beanpublic Queue userQueue() {return new Queue("userQueue");}}
测试
通过工程启动类启动两个或工程,执行工程启动类中的Main就可以。
启动完成之后,通过浏览器访问以下地址:http://localhost:8081/userTest,然后去看Eclipse的控制台
消息发送端控制台会打印以下信息
消息接收端控制台会打印以下信息
通过控制台打印的信息,生成者生产了一个实体对象消息,消费者消费了这个实体对象消息
=================topic ExChange示例=================
topic 是RabbitMQ中最灵活的一种方式,可以根据binding_key自由的绑定不同的队列
首先对topic规则配置,这里使用两个队列来测试(也就是在Application类中创建和绑定的topic.message和topic.messages两个队列),其中topic.message的bindting_key为
“topic.message”,topic.messages的binding_key为“topic.#”;
生产者项目:
增加消息生产者
package org.morecare.topic;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.ponent;@Componentpublic class TopicSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String msg1 = "I am topic.mesaage msg======";System.out.println("sender1 : " + msg1);this.rabbitTemplate.convertAndSend("exchange", "topic.message", msg1);String msg2 = "I am topic.mesaages msg########";System.out.println("sender2 : " + msg2);this.rabbitTemplate.convertAndSend("exchange", "topic.messages", msg2);}}
在消息发送类中增加测试消息发送方法,增加之后的类如下:
package org.morecare.controller;import org.morecare.hello.HelloSender1;import org.morecare.hello.HelloSender2;import org.morecare.topic.TopicSender;import org.morecare.user.UserSender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class RabbitTest {@Autowiredprivate HelloSender1 helloSender1;@Autowiredprivate HelloSender2 helloSender2;@Autowiredprivate UserSender userSender;@Autowiredprivate TopicSender topicSender;//单生产者-单消费者@GetMapping("/hello")public void hello() {helloSender1.send();}//单生产者-多消费者@GetMapping("/oneToMany")public void oneToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);}}// 多生产者-多消费者@GetMapping("/manyToMany")public void manyToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);helloSender2.send("hellomsg:"+i);}}//Object@GetMapping("/userTest")public void userTest() {userSender.send();}//Topic@GetMapping("/topicTest")public void topicTest() {topicSender.send();}}
消费者项目:
增加消息消费者1
package org.morecare.topic;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "topic.message")public class topicMessageReceiver1 {@RabbitHandlerpublic void process(String msg) {System.out.println("topicMessageReceiver1 : " +msg);}}
增加消息消费者2
package org.morecare.topic;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "topic.messages")public class topicMessageReceiver2 {@RabbitHandlerpublic void process(String msg) {System.out.println("topicMessageReceiver2 : " +msg);}}
在配置类中增加一个Queue配置
package org.morecare;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.Queue;import org.springframework.amqp.core.TopicExchange;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabitmqTopicConfig {@Beanpublic Queue queueMessage() {return new Queue("topic.message");}@Beanpublic Queue queueMessages() {return new Queue("topic.messages");}@BeanTopicExchange exchange() {return new TopicExchange("exchange");}/*** 将队列topic.message与exchange绑定,binding_key为topic.message,就是完全匹配* @param queueMessage* @param exchange* @return*/@BeanBinding bindingExchangeMessage(Queue queueMessage, TopicExchange exchange) {return BindingBuilder.bind(queueMessage).to(exchange).with("topic.message");}/*** 将队列topic.messages与exchange绑定,binding_key为topic.#,模糊匹配* @param queueMessage* @param exchange* @return*/@BeanBinding bindingExchangeMessages(Queue queueMessages, TopicExchange exchange) {return BindingBuilder.bind(queueMessages).to(exchange).with("topic.#");}}
测试
通过工程启动类启动两个或工程,执行工程启动类中的Main就可以。
启动完成之后,通过浏览器访问以下地址:http://localhost:8081/topicTest,然后去看Eclipse的控制台
消息发送端控制台会打印以下信息
消息接收端控制台会打印以下信息
通过控制台打印的信息,可以看出两个消息生产者生产了两条消息,对两个消息消费者消费了三次。
sender1发送的消息,routing_key是“topic.message”,所以exchange里面的绑定的binding_key是“topic.message”,topic.#都符合路由规则;所以sender1发送的消息,两个队列都能接收到;
sender2发送的消息,routing_key是“topic.messages”,所以exchange里面的绑定的binding_key只有topic.#都符合路由规则;所以sender2发送的消息只有队列topic.messages能收到。
=================fanout ExChange示例=================
Fanout 就是我们熟悉的广播模式或者订阅模式,给Fanout转发器发送消息,绑定了这个转发器的所有队列都收到这个消息。
这里使用三个队列来测试(也就是在Application类中创建和绑定的fanout.A、fanout.B、fanout.C)这三个队列都和Application中创建的fanoutExchange转发器绑定。
生产者项目:
增加消息生产者
package org.morecare.fanout;import org.springframework.amqp.core.AmqpTemplate;import org.springframework.beans.factory.annotation.Autowired;import org.ponent;@Componentpublic class FanoutSender {@Autowiredprivate AmqpTemplate rabbitTemplate;public void send() {String msgString="fanoutSender :hello i am lvgang";System.out.println(msgString);this.rabbitTemplate.convertAndSend("fanoutExchange","abcd.ee", msgString);}}
在消息发送类中增加测试消息发送方法,增加之后的类如下:
package org.morecare.controller;import org.morecare.fanout.FanoutSender;import org.morecare.hello.HelloSender1;import org.morecare.hello.HelloSender2;import org.morecare.topic.TopicSender;import org.morecare.user.UserSender;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.web.bind.annotation.GetMapping;import org.springframework.web.bind.annotation.RestController;@RestControllerpublic class RabbitTest {@Autowiredprivate HelloSender1 helloSender1;@Autowiredprivate HelloSender2 helloSender2;@Autowiredprivate UserSender userSender;@Autowiredprivate TopicSender topicSender;@Autowiredprivate FanoutSender fanoutSender;//单生产者-单消费者@GetMapping("/hello")public void hello() {helloSender1.send();}//单生产者-多消费者@GetMapping("/oneToMany")public void oneToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);}}// 多生产者-多消费者@GetMapping("/manyToMany")public void manyToMany() {for(int i=0;i<10;i++){helloSender1.send("hellomsg:"+i);helloSender2.send("hellomsg:"+i);}}//Object@GetMapping("/userTest")public void userTest() {userSender.send();}//Topic@GetMapping("/topicTest")public void topicTest() {topicSender.send();}//fanout exchange类型rabbitmq测试@GetMapping("/fanoutTest")public void fanoutTest() {fanoutSender.send();}}
消费者项目:
增加消息消费者1
package org.morecare.fanout;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "fanout.A")public class FanoutReceiverA {@RabbitHandlerpublic void process(String msg) {System.out.println("FanoutReceiverA : " + msg);}}
增加消息消费者2
package org.morecare.fanout;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "fanout.B")public class FanoutReceiverB {@RabbitHandlerpublic void process(String msg) {System.out.println("FanoutReceiverB : " + msg);}}
增加消息消费者3
package org.morecare.fanout;import org.springframework.amqp.rabbit.annotation.RabbitHandler;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.ponent;@Component@RabbitListener(queues = "fanout.C")public class FanoutReceiverC {@RabbitHandlerpublic void process(String msg) {System.out.println("FanoutReceiverC : " + msg);}}
在配置类中增加一个Queue配置
package org.morecare;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.FanoutExchange;import org.springframework.amqp.core.Queue;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;@Configurationpublic class RabitmqFanoutConfig {@Beanpublic Queue AMessage() {return new Queue("fanout.A");}@Beanpublic Queue BMessage() {return new Queue("fanout.B");}@Beanpublic Queue CMessage() {return new Queue("fanout.C");}@BeanFanoutExchange fanoutExchange() {return new FanoutExchange("fanoutExchange");}@BeanBinding bindingExchangeA(Queue AMessage,FanoutExchange fanoutExchange) {return BindingBuilder.bind(AMessage).to(fanoutExchange);}@BeanBinding bindingExchangeB(Queue BMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(BMessage).to(fanoutExchange);}@BeanBinding bindingExchangeC(Queue CMessage, FanoutExchange fanoutExchange) {return BindingBuilder.bind(CMessage).to(fanoutExchange);}}
测试
通过工程启动类启动两个或工程,执行工程启动类中的Main就可以。
启动完成之后,通过浏览器访问以下地址:http://localhost:8081/fanoutTest,然后去看Eclipse的控制台
消息发送端控制台会打印以下信息
消息接收端控制台会打印以下信息
通过控制台打印的信息,可以看出1个消息生产者生产了1条消息,被三个消费者每个消费了一次。增加在消息发送时,指定了routing_key为"abcd.ee"不存在,但是所有接收者都接受到了消息。
注这种广播的方式必须消息消费者先启动,如果后启动消息消费者是收到之前的消息的。