700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > StringBoot与RabbitMQ的简单入门 - 生产者和消费者(Producer/Consumer)

StringBoot与RabbitMQ的简单入门 - 生产者和消费者(Producer/Consumer)

时间:2021-02-17 09:28:14

相关推荐

StringBoot与RabbitMQ的简单入门 - 生产者和消费者(Producer/Consumer)

一. 创建SpringBoot项目

参照Spring官网,可快速创建一个SpringBoot的项目。

pom.xml的文件

<dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-test</artifactId><scope>test</scope></dependency></dependencies>

启动成功后如下:

二, 了解RabbitMQ的模型

Consumer

***************************************************************************************************************************

@Configurationpublic class RabbitConfig {@Bean("firstQueue")public Queue getFirstQueue(){ return new Queue("FIRST_QUEUE");}@Bean("secondQueue")public Queue getSecondQueue(){ return new Queue("SECOND_QUEUE");}@Bean("thirdQueue")public Queue getThirdQueue(){ return new Queue("THIRD_QUEUE");}@Bean("fourthQueue")public Queue getFourthQueue(){ return new Queue("FOURTH_QUEUE");}@Bean("topicExchange")public TopicExchange getTopicExchange(){return new TopicExchange("TOPIC_EXCHANGE");}@Bean("directExchange")public DirectExchange getSecondDirectExchange(){return new DirectExchange("DIRECT_EXCHANGE");}@Bean("fountExchange")public FanoutExchange getHarryFountExchange(){return new FanoutExchange("FOUNT_EXCHANGE");}@Beanpublic Binding bingFirst(@Qualifier("topicExchange") TopicExchange exchange,@Qualifier("firstQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("*.ginny.*");}@Beanpublic Binding bingSecond(@Qualifier("directExchange") DirectExchange exchange,@Qualifier("secondQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange).with("ginny.test");}@Beanpublic Binding bingThird(@Qualifier("fountExchange") FanoutExchange exchange,@Qualifier("thirdQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange);}@Beanpublic Binding bingFourth(@Qualifier("fountExchange") FanoutExchange exchange,@Qualifier("fourthQueue") Queue queue){return BindingBuilder.bind(queue).to(exchange);}/*** 创建监听* @param connectionFactory 如果不创建监听,此处默认RabbitMQ的localhost* @return*/@Bean("rabbitListenerContainerFactory")public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();factory.setConnectionFactory(connectionFactory);factory.setMessageConverter(new Jackson2JsonMessageConverter());factory.setAcknowledgeMode(AcknowledgeMode.NONE);factory.setAutoStartup(true);return factory;}}

***************************************************************************************************************************

@Component@RabbitListener(queues = "FIRST_QUEUE", containerFactory="rabbitListenerContainerFactory")public class FirstConsumer {@RabbitHandlerpublic void process(@Payload String msg){System.out.println("First Queue received msg : " + msg);}}

***************************************************************************************************************************

@Component@RabbitListener(queues = "FOURTH_QUEUE", containerFactory="rabbitListenerContainerFactory")public class FourthConsumer {@RabbitHandlerpublic void process(@Payload String msg){System.out.println("Fourth Queue received msg : " + msg);}}

***************************************************************************************************************************

@Component@RabbitListener(queues = "SECOND_QUEUE", containerFactory="rabbitListenerContainerFactory")public class SecondConsumer {@RabbitHandlerpublic void process(@Payload String msg){System.out.println("Second Queue received msg : " + msg);}}

***************************************************************************************************************************

@Component@RabbitListener(queues = "THIRD_QUEUE", containerFactory="rabbitListenerContainerFactory")public class ThirdConsumer {@RabbitHandlerpublic void process(@Payload String msg){System.out.println("Third Queue received msg : " + msg);}}

***************************************************************************************************************************

Consumer启动后,管理页面:自动创建对应的Exchange和Queue,以及对应的Binging

Producer

***************************************************************************************************************************

@Configurationpublic class RabbitConfig {/*** 所有的消息发送都会转换成JSON格式发到交换机* @param connectionFactory* @return*/@Beanpublic RabbitTemplate ginnyTemplate(final ConnectionFactory connectionFactory) {final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);rabbitTemplate.setMessageConverter(new Jackson2JsonMessageConverter());return rabbitTemplate;}}

***************************************************************************************************************************

@Componentpublic class RabbitSender {@AutowiredRabbitTemplate ginnyTemplate;public void send() {ginnyTemplate.convertAndSend("DIRECT_EXCHANGE","ginny.test", "a direct msg : 中原镖局,汉中省解放路266号");ginnyTemplate.convertAndSend("TOPIC_EXCHANGE","shanghai.ginny.test","a topic msg : shanghai.ginny.test");ginnyTemplate.convertAndSend("TOPIC_EXCHANGE","changsha.ginny.test","a topic msg : changsha.ginny.test");// 发送JSON字符串//ObjectMapper mapper = new ObjectMapper();String json = "a fanoutExchange msg : this message is from fanout";ginnyTemplate.convertAndSend("FOUNT_EXCHANGE","", json);}}

***************************************************************************************************************************

Producer和Consumer启动后,Producer发送消息,运行结果如下:

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。