700字范文,内容丰富有趣,生活中的好帮手!
700字范文 > 如何在ASP.NET Core中使用Azure Service Bus Queue

如何在ASP.NET Core中使用Azure Service Bus Queue

时间:2018-08-04 17:27:29

相关推荐

如何在ASP.NET Core中使用Azure Service Bus Queue

原文:USING AZURE SERVICE BUS QUEUES WITH CORE SERVICES

作者:damienbod[1]

译文:如何在 Core中使用Azure Service Bus Queue

地址:/lwqlun/p/10760227.html

作者:Lamond Lu

源代码:/lamondlu/AzureServiceBusMessaging

本文展示了如何使用Azure Service Bus Queue, 实现2个 Core Api应用之间的消息传输。

配置Azure Service Bus Queue

你可以从官网文档中了解到如何配置一个Azure Service Bus Queue.

/en-us/azure/service-bus-messaging/service-bus-create-namespace-portal

这里我们使用Queue或者Topic来实现消息传输。Queue是一种消息传输类型,一旦一个消息被一个消费者接收了,该消息就会从Queue中被移除。

与Queue不同,Topic提供的是一对多的通讯方式。

架构图

整个应用的实现如下:

•Api 1负责发送消息•Api 2负责监听Azure Service Bus,并处理接收到的消息

实现一个Service Bus Queue

这里我们首先需要引入Microsoft.Azure.ServiceBus[2]程序集。Microsoft.Azure.ServiceBus[3]是Azure Service Bus的客户端库。针对Service Bus的连接字符串我们保存在项目的User Secret中。当部署项目的时候,我们可以使用Azure Key Valut来设置这个Secret值。

在Visual Studio中,右键点击API1, API2项目属性,选择Manage User Secrets就可以管理当前项目使用的所有私密信息。

为了发送向Azure Service Bus Queue发送消息,我们需要创建一个SendMessage方法,并接收一个消息参数。这里我们创建了一个我们自己的消息内容类型MyPayload, 将当前该MyPayload对象序列化成Json字符串, 添加到一个Message对象中。

using Microsoft.Azure.ServiceBus;using Microsoft.Extensions.Configuration;using Newtonsoft.Json;using System.Text;using System.Threading.Tasks;

namespace ServiceBusMessaging{public class ServiceBusSender{private readonly QueueClient _queueClient;private readonly IConfiguration _configuration;private const string QUEUE_NAME = "simplequeue";

public ServiceBusSender(IConfiguration configuration){_configuration = configuration;_queueClient = new QueueClient(_configuration.GetConnectionString("ServiceBusConnectionString"),QUEUE_NAME);}

public async Task SendMessage(MyPayload payload){string data = JsonConvert.SerializeObject(payload);Message message = new Message(Encoding.UTF8.GetBytes(data));

await _queueClient.SendAsync(message);}}}

在API 1和API 2中,我们需要将ServiceBusSender注册到应用程序的IOC容器中。这里为了测试方便,我们同时注册Swagger服务。

public void ConfigureServices(IServiceCollection services){services.AddMvc();

services.AddScoped<ServiceBusSender>();

services.AddSwaggerGen(c =>{c.SwaggerDoc("v1", new Info{Version = "v1",Title = "Payload View API",});});}

接下来,我们就可以在控制器中通过构造函数注入的方式使用这个服务了。

在API1中,我们创建一个POST方法,这个方法会将API接收到Payload对象发送到Azure Service Bus Queue中。

[HttpPost][ProducesResponseType(typeof(Payload), StatusCodes.Status200OK)][ProducesResponseType(typeof(Payload), StatusCodes.Status409Conflict)]public async Task<IActionResult> Create([FromBody][Required]Payload request){if (data.Any(d => d.Id == request.Id)){return Conflict($"data with id {request.Id} already exists");}

data.Add(request);

// Send this to the bus for the other servicesawait _serviceBusSender.SendMessage(new MyPayload{Goals = request.Goals,Name = request.Name,Delete = false});

return Ok(request);}

从Queue中获取消息

为了监听Azure Service Bus Queue, 并处理接收到的消息,我们创建了一个新类ServiceBusConsumerServiceBusConsumer实现了IServiceBusConsumer接口。

Queue的连接字符串是使用IConfiguration读取的。RegisterOnMessageHandlerAndReceiveMessages方法负责注册消息处理程序ProcessMessagesAsync处理消息。ProcessMessagesAsync方法会将得到的消息转换成对象,并调用IProcessData接口完成最终的消息处理。

using Microsoft.Azure.ServiceBus;using Microsoft.Extensions.Configuration;using Microsoft.Extensions.Logging;using Newtonsoft.Json;using System.Text;using System.Threading;using System.Threading.Tasks;

namespace ServiceBusMessaging{public interface IServiceBusConsumer{void RegisterOnMessageHandlerAndReceiveMessages();Task CloseQueueAsync();}

public class ServiceBusConsumer : IServiceBusConsumer{private readonly IProcessData _processData;private readonly IConfiguration _configuration;private readonly QueueClient _queueClient;private const string QUEUE_NAME = "simplequeue";private readonly ILogger _logger;

public ServiceBusConsumer(IProcessData processData,IConfiguration configuration,ILogger<ServiceBusConsumer> logger){_processData = processData;_configuration = configuration;_logger = logger;_queueClient = new QueueClient(_configuration.GetConnectionString("ServiceBusConnectionString"), QUEUE_NAME);}

public void RegisterOnMessageHandlerAndReceiveMessages(){var messageHandlerOptions = new MessageHandlerOptions(ExceptionReceivedHandler){MaxConcurrentCalls = 1,AutoComplete = false};

_queueClient.RegisterMessageHandler(ProcessMessagesAsync, messageHandlerOptions);}

private async Task ProcessMessagesAsync(Message message, CancellationToken token){var myPayload = JsonConvert.DeserializeObject<MyPayload>(Encoding.UTF8.GetString(message.Body));_processData.Process(myPayload);await pleteAsync(message.SystemProperties.LockToken);}

private Task ExceptionReceivedHandler(ExceptionReceivedEventArgs exceptionReceivedEventArgs){_logger.LogError(exceptionReceivedEventArgs.Exception, "Message handler encountered an exception");var context = exceptionReceivedEventArgs.ExceptionReceivedContext;

_logger.LogDebug($"- Endpoint: {context.Endpoint}");_logger.LogDebug($"- Entity Path: {context.EntityPath}");_logger.LogDebug($"- Executing Action: {context.Action}");

return pletedTask;}

public async Task CloseQueueAsync(){await _queueClient.CloseAsync();}}}

其中IProcessData接口存在于类库项目ServiceBusMessaging中,它是用来处理消息的。

public interface IProcessData{void Process(MyPayload myPayload);}

在Api 2中,我们创建一个ProcessData类,它实现了IProcessData接口。

public class ProcessData : IProcessData{public void Process(MyPayload myPayload){DataServiceSimi.Data.Add(new Payload{Name = myPayload.Name,Goals = myPayload.Goals});}}

这里为了简单测试,我们创建了一个静态类DataServiceSimi,其中存放了API2中所有保存Payload对象。同时,我们还创建了一个新的控制器ViewPayloadMessagesController,在其中添加了一个GET Action,并返回了静态类DataServiceSimi中的所有数据。

[Route("api/[controller]")][ApiController]public class ViewPayloadMessagesController : ControllerBase{[HttpGet][ProducesResponseType(StatusCodes.Status200OK)]public ActionResult<List<Payload>> Get(){return Ok(DataServiceSimi.Data);}}

最后我们还需要将ProcessData注册到API2的IOC容器中。

public void ConfigureServices(IServiceCollection services){services.AddMvc();

services.AddSingleton<IServiceBusConsumer, ServiceBusConsumer>();services.AddTransient<IProcessData, ProcessData>();}

最终效果

现在我们分别启用2个Api项目,并在Api 1的Swagger文档界面,调用POST请求,添加一个Payload

操作完成之后,我们访问Api 2的/api/ViewPayloadMessages, 获得结果如下,Api 1发出的消息出现在了Api 2的结果集中,这说明Api 2从Azure Service Bus Queue中获取了消息,并保存在了自己的静态类DataServiceSimi中。

References

[1]damienbod:/author/damienbod/

[2]Microsoft.Azure.ServiceBus:/packages/Microsoft.Azure.ServiceBus

[3]Microsoft.Azure.ServiceBus:/packages/Microsoft.Azure.ServiceBus

.NET社区新闻,深度好文,欢迎访问公众号文章汇总

本内容不代表本网观点和政治立场,如有侵犯你的权益请联系我们处理。
网友评论
网友评论仅供其表达个人看法,并不表明网站立场。