700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > Spring Boot笔记-接收RabbitMQ队列中的消息

Spring Boot笔记-接收RabbitMQ队列中的消息

时间:2023-03-23 14:44:32

相关推荐

Spring Boot笔记-接收RabbitMQ队列中的消息

目录

基本概念

代码与实例

基本概念

首先有个关键:此处实验接收的数据类型为Order,这里要求发送和接收要一模一样。

包括包名和类名都要一模一样:

如下,consumerDemo

下面是productorDemo

这里,包名和类都一模一样否则接收端监听会失败!

在消费(接收订阅)端要配置一些数据:

spring.rabbitmq.listener.simple.concurrency=5spring.rabbitmq.listener.simple.max-concurrency=10spring.rabbitmq.listener.simple.acknowledge-mode=manual

这里指的是目前并发为5个,最大并发数为10个,监听确认为手动,也就是接收了数据,要给RabbitMQ给一个反馈信息

如下

这里有2个注解,是简单使用RabbitMQ的关键!

@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order-queue", durable = "true"),exchange = @Exchange(name = "order-exchange", type = "topic"),key = "order.#"))@RabbitHandlerpublic void onOrderMessage(@Payload Order order,@Headers Map<String, Object> headers,Channel channel //手工签收需要rabbitMQ的通道) throws Exception{............}

这里如果没有对应的交换机和队列,那么此处就会自动新建

代码与实例

发送端不停的发送消息!接收端如下:

发送端关键代码:

OderSender.java

package SpringBoot.demo.produce;import SpringBoot.demo.entity.Order;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.amqp.rabbit.support.CorrelationData;import org.springframework.beans.factory.annotation.Autowired;import org.ponent;@Componentpublic class OderSender {@Autowiredprivate RabbitTemplate rabbitTemplate;public void send(Order order) throws Exception{CorrelationData correlationData = new CorrelationData();correlationData.setId(order.getMessageId());rabbitTemplate.convertAndSend("order-exchange",//exchange"order.abcd", //routingKeyorder,//消息体correlationData);//correlationData消息唯一ID}}

DemoApplicationTests.java

package SpringBoot.demo;import SpringBoot.demo.entity.Order;import SpringBoot.demo.produce.OderSender;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.test.context.junit4.SpringRunner;import java.util.UUID;@RunWith(SpringRunner.class)@SpringBootTestpublic class DemoApplicationTests {@Autowiredprivate OderSender oderSender;@Testpublic void contextLoads() {}@Testpublic void testSend1()throws Exception{Order order = new Order();order.setId("0618000000000003");order.setName("测试订单3");order.setMessageId(System.currentTimeMillis() + "$" + UUID.randomUUID().toString());oderSender.send(order);}}

application.properties

server.port=8001spring.rabbitmq.addresses=192.168.164.141:5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000server.servlet.context-path=/spring.http.encoding.charset=UTF-8spring.jackson.data-format=yyyy-MM-dd HH:mm:ssspring.jackson.time-zone=GMT+8spring.jackson.default-property-inclusion=NON_NULL

接收端关键如下:

OrderReceiver.java

package SpringBoot.demo.consumer;import SpringBoot.demo.entity.Order;import com.rabbitmq.client.Channel;import org.springframework.amqp.rabbit.annotation.*;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.handler.annotation.Headers;import org.springframework.messaging.handler.annotation.Payload;import org.ponent;import java.util.Map;@Componentpublic class OrderReceiver {@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "order-queue", durable = "true"),exchange = @Exchange(name = "order-exchange", type = "topic"),key = "order.#"))@RabbitHandlerpublic void onOrderMessage(@Payload Order order,@Headers Map<String, Object> headers,Channel channel //手工签收需要rabbitMQ的通道) throws Exception{//消费者操作System.out.println("--------------收到消息,开始消费--------------");System.out.println("订单ID:" + order.getId());//告诉RabbitMQ我已经签收了long deliveryTag = (long)headers.get(AmqpHeaders.DELIVERY_TAG);channel.basicAck(deliveryTag, false); //false为不支持批量签收}}

application.properties

server.port=8002spring.rabbitmq.addresses=192.168.164.141:5672spring.rabbitmq.username=guestspring.rabbitmq.password=guestspring.rabbitmq.virtual-host=/spring.rabbitmq.connection-timeout=15000server.servlet.context-path=/spring.http.encoding.charset=UTF-8spring.jackson.data-format=yyyy-MM-dd HH:mm:ssspring.jackson.time-zone=GMT+8spring.jackson.default-property-inclusion=NON_NULL#配置关于consumer相关的spring.rabbitmq.listener.simple.concurrency=5spring.rabbitmq.listener.simple.max-concurrency=10spring.rabbitmq.listener.simple.acknowledge-mode=manual#限流,同一时间只有一条消息消费spring.rabbitmq.listener.simple.prefetch=1

源码下载地址:

/fengfanchen/Java/tree/master/ProductorDemo

/fengfanchen/Java/tree/master/consumerDemo

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。