700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > 使用Spring Cloud Stream与RabbitMQ集成

使用Spring Cloud Stream与RabbitMQ集成

时间:2023-07-24 05:13:16

相关推荐

使用Spring Cloud Stream与RabbitMQ集成

在我以前的文章中,我写了两个系统之间非常简单的集成场景-一个生成一个工作单元,另一个处理该工作单元,以及Spring Integration如何使这种集成非常容易。

在这里,我将演示如何使用Spring Cloud Stream进一步简化此集成方案

我在这里有示例代码– pom.xml中提供了适用于Spring Cloud Stream的正确maven依赖关系。

制片人

因此,再次从负责生成工作单元的生产者开始。 将消息发送到RabbitMQ所需的代码明智的全部工作就是按照以下方式进行Java配置:

@Configuration@EnableBinding(WorkUnitsSource.class)@IntegrationComponentScanpublic class IntegrationConfiguration {}

从表面上看,这看似简单,但在幕后做了很多事情,据我了解并从文档中了解到,这些是该配置触发的:

1.创建基于绑定到@EnableBinding批注的类的Spring Integration消息通道。 上面的WorkUnitsSource类是一个称为“ worksChannel”的自定义通道的定义,如下所示:

import org.springframework.cloud.stream.annotation.Output;import org.springframework.messaging.MessageChannel;public interface WorkUnitsSource {String CHANNEL_NAME = "worksChannel";@OutputMessageChannel worksChannel();}

2.根据运行时可用的“绑定程序”实现(例如RabbitMQ,Kaffka,Redis,Gemfire),上一步中的通道将连接到系统中的适当结构–因此,例如,我希望我的“ worksChannel”依次发送消息到RabbitMQ,Spring Cloud Stream将负责在RabbitMQ中自动创建主题交换

我希望就数据如何发送到RabbitMQ进行一些进一步的自定义-特别是我希望域对象在发送之前先序列化为json,并且我想指定将有效负载发送到的RabbitMQ交换的名称。由某些配置控制,这些配置可以使用yaml文件以以下方式附加到通道:

spring:cloud:stream:bindings:worksChannel:destination: work.exchangecontentType: application/jsongroup: testgroup

最后一个细节是应用程序其余部分与Spring Cloud Stream交互的方式,可以通过定义消息网关直接在Spring Integration中完成:

import org.springframework.integration.annotation.Gateway;import org.springframework.integration.annotation.MessagingGateway;import works.service.domain.WorkUnit;@MessagingGatewaypublic interface WorkUnitGateway {@Gateway(requestChannel = WorkUnitsSource.CHANNEL_NAME)void generate(WorkUnit workUnit);}

基本上就是这样,Spring Cloud Stream现在将连接整个Spring集成流程,并在RabbitMQ中创建适当的结构。

消费者

与生产者类似,首先我想定义一个名为“ worksChannel”的通道,该通道将处理来自RabbitMQ的传入消息:

import org.springframework.cloud.stream.annotation.Input;import org.springframework.messaging.SubscribableChannel;public interface WorkUnitsSink {String CHANNEL_NAME = "worksChannel";@InputSubscribableChannel worksChannel();}

然后让Spring Cloud Stream根据此定义创建通道和RabbitMQ绑定:

import org.springframework.cloud.stream.annotation.EnableBinding;import org.springframework.context.annotation.Configuration;@Configuration@EnableBinding(WorkUnitsSink.class)public class IntegrationConfiguration {}

为了处理消息,Spring Cloud Stream提供了一个侦听器,可以通过以下方式创建它:

@Servicepublic class WorkHandler {private static final Logger LOGGER = LoggerFactory.getLogger(WorkHandler.class);@StreamListener(WorkUnitsSink.CHANNEL_NAME)public void process(WorkUnit workUnit) {LOGGER.info("Handling work unit - id: {}, definition: {}", workUnit.getId(), workUnit.getDefinition());}}

最后是将这个通道连接到yaml文件中表示的RabbitMQ基础结构的配置:

spring:cloud:stream:bindings:worksChannel:destination: work.exchangegroup: testgroup

现在,如果启动了生产者和任何数量的使用者,则通过生产者发送的消息将作为json发送到Rabbit MQ主题交换,由使用者检索,反序列化为对象并传递给工作处理器。

现在,纯粹由Spring Cloud Stream库按照惯例处理创建RabbitMQ基础结构所涉及的大量样板。 尽管Spring Cloud Stream尝试提供原始Spring Integration的基础,但是掌握Spring Integration的基本知识以有效使用Spring Cloud Stream还是很有用的。

此处描述的示例可在我的github存储库中找到

翻译自: //08/integrating-rabbitmq-using-spring-cloud-stream.html

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。