Azure 应用服务 - Web 应用 - Azure 服务总线

问题描述 投票:0回答:1

我想使用来自 .NET core REST api 的 Azure 服务总线队列消息。

我可以在笔记本电脑上本地执行此操作,因为我可以手动运行 API 来使用消息。

将此 REST API 部署为 Azure 应用服务上的 Web 应用程序后...... 我的WEB应用程序如何知道“Azure服务总线队列”中的消息已到达并且需要使用它。

注:

  1. 我知道这可以使用 Azure 函数 - 服务总线触发器来完成
  2. 我对微服务-生产者-消费者观点更感兴趣
  3. 队列系统可以是Azure服务总线/ RabbitMQ / Kafka等

尝试在网上查找,没有找到好的东西。

azure apache-kafka rabbitmq azureservicebus azure-webapps
1个回答
0
投票

您可以实现 Peek 消息函数来仅读取消息,而不将其从 .net core API 中的服务总线队列中删除:-

查看消息队列示例:-

程序.cs:-

using Azure.Messaging.ServiceBus;
using Newtonsoft.Json;
using ServiceBusQueue;

string connectionString = "Endpoint=sb://siliconsb54.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=R0hMdUwLTUm1+aIfJbuYURsHEtqkjCSqM+ASbJttxpM=";
string queueName = "myqueue";

List<Order> orders = new List<Order>()
{
        new Order(){OrderID="01",Quantity=100,UnitPrice=9.99F},
        new Order(){OrderID="02",Quantity=200,UnitPrice=10.99F},
        new Order(){OrderID="03",Quantity=300,UnitPrice=8.99F}

};

// await SendMessage(orders);

await PeekMessages();


async Task SendMessage(List<Order> orders)
{
    ServiceBusClient serviceBusClient=new ServiceBusClient(connectionString);
    ServiceBusSender serviceBusSender= serviceBusClient.CreateSender(queueName);

    ServiceBusMessageBatch serviceBusMessageBatch = await serviceBusSender.CreateMessageBatchAsync();
    foreach(Order order in orders)
    {

        ServiceBusMessage serviceBusMessage = new ServiceBusMessage(JsonConvert.SerializeObject(order));
        serviceBusMessage.ContentType = "application/json";
        if (!serviceBusMessageBatch.TryAddMessage(
            serviceBusMessage))
        {
            throw new Exception("Error occured");
        }
            
    }
    Console.WriteLine("Sending messages");
    await serviceBusSender.SendMessagesAsync(serviceBusMessageBatch);
}

async Task PeekMessages()
{
    ServiceBusClient serviceBusClient = new ServiceBusClient(connectionString);
    ServiceBusReceiver serviceBusReceiver = serviceBusClient.CreateReceiver(queueName,
        new ServiceBusReceiverOptions() { ReceiveMode = ServiceBusReceiveMode.PeekLock });

    IAsyncEnumerable<ServiceBusReceivedMessage> messages=serviceBusReceiver.ReceiveMessagesAsync();

    await foreach(ServiceBusReceivedMessage message in messages)
    {
        Order order = JsonConvert.DeserializeObject<Order>(message.Body.ToString());
        Console.WriteLine("Order Id {0}", order.OrderID);
        Console.WriteLine("Quantity {0}", order.Quantity);
        Console.WriteLine("Unit Price {0}", order.UnitPrice);

    }
}

鉴于我的 Order.cs:-

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace ServiceBusQueue
{
    public class Order
    {
        public string OrderID { get; set; }
        public int Quantity { get; set; }
        public float UnitPrice { get; set; }

    }
}

您还可以直接使用Azure Functions,一旦您的服务总线队列收到新消息,它就会被触发:-

Function1.cs:-

using System;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Host;
using Microsoft.Extensions.Logging;

namespace FunctionApp10
{
    public class Function1
    {
        [FunctionName("Function1")]
        public void Run([ServiceBusTrigger("testsidqueue", Connection = "ConnectionStringsid")]string myQueueItem, ILogger log)
        {
            log.LogInformation($"C# ServiceBus queue trigger function processed message: {myQueueItem}");
        }
    }
}

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.