Azure 服务总线 - 来自发件人的消息被排队两次

问题描述 投票:0回答:1

我对 Azure 服务总线和消息队列总体来说还比较陌生。在熟悉自己的过程中,我遇到了一些事情,我最初认为这是我做的一些笨拙的事情,但当我试图找出根本原因时,我变得越来越困惑。

在一个非常简单的应用程序中,我有一个端点,它将一些硬编码的详细信息传递到通用的“将此消息排队”服务类:

public class TestFunction(IEventBusService eventBusService)
{
    [Function("TestFunction")]
    public async Task<HttpResponseData> Run([HttpTrigger(AuthorizationLevel.Function, "get")] HttpRequestData req)
    {
        try
        {
            await eventBusService.SendAsync(new TestObject { Message = "Message for service bus", SentAtDateTimeOffset = DateTimeOffset.UtcNow }, Queues.Test);

            var response = req.CreateResponse(HttpStatusCode.OK);
            await response.WriteAsJsonAsync("Message successfully published to service bus", "application/json");
            return response;
        }
        catch (ServiceBusOperationFailedException)
        {
            var response = req.CreateResponse(HttpStatusCode.InternalServerError);
            await response.WriteAsJsonAsync("Unable to publish message to service bus", "application/json");
            return response;
        }
    }
}

public class TestObject
{
    public string Message { get; set; }
    public DateTimeOffset SentAtDateTimeOffset { get; set; }
}

所讨论的方法如下所示:

public class EventBusService(ServiceBusClient serviceBusClient) : IEventBusService
{
    public async Task SendAsync<T>(T dataToSend, string sendTo, CancellationToken cancellationToken = default)
    {
        ValidateParams(dataToSend, sendTo);

        var serviceBusSender = serviceBusClient.CreateSender(sendTo);
        var messageJson = new ServiceBusMessage(JsonSerializer.Serialize(dataToSend));

        try
        {
            await serviceBusSender.SendMessageAsync(messageJson, cancellationToken);
        }
        catch (Exception ex)
        {
            throw new ServiceBusOperationFailedException($"Unable to publish message to '{sendTo}'.", ex);
        }
        finally
        {
            await serviceBusSender.DisposeAsync();
        }
    }

    private void ValidateParams<T>(T dataToSend, string sendTo)
    {
        if (dataToSend is null)
            throw new ArgumentNullException(nameof(dataToSend));

        if (string.IsNullOrEmpty(sendTo))
            throw new ArgumentNullException(nameof(sendTo));
    }
}

一切看起来都很简单。

为了验证其行为是否符合预期,我进行了一个快速测试,该测试将调用端点...

    [TestMethod]
    public async Task Should_OnSuccessfulPublishToEventBus_ReceiveConfirmationMessage()
    {
        // Arrange
        var testFunction = new TestFunction(EventBusHelper.CreateService());

        var mockFunctionContext = EventBusHelper.GetMockFunctionContext();

        //Act
        var response = await testFunction.Run(new TestHttpRequestData(mockFunctionContext));

        //Assert
        Assert.IsNotNull(response);
        Assert.AreEqual(HttpStatusCode.OK, response.StatusCode);

        response.Body.Position = 0;
        using var streamReader = new StreamReader(response.Body);
        var responseBody = await streamReader.ReadToEndAsync();

        Assert.AreEqual("\"Message successfully published to service bus\"", responseBody);
    }

...并使用 Service Bus Explorer 检查结果。

但是,当我查看排队的消息时,我可以看到同一条消息已被添加到队列中两次。从那以后,每次我运行测试时都会发生这种情况。

预试运行: enter image description here

试运行后: enter image description here

根据我上面的代码,发生这种情况有什么原因吗?我已经调试并确保端点只被命中一次,所以我只能认为这是我不知道的消息队列的一些复杂性。任何帮助或建议表示赞赏!

谢谢, 马克

c# message-queue azureservicebus azure-servicebus-queues
1个回答
0
投票

Azure 服务总线 - 来自发件人的消息被排队两次

可能是由于Host并发发送消息,或者您设置了重试策略等原因,导致发送了两次数据。 防止的一种方法是在创建队列时设置以下选项:

enter image description here

或下面是代码(使用DuplicateDetection),仅发送一次消息:

函数1.cs:

using System;
using System.Text.Json;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;
using Microsoft.AspNetCore.Mvc;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.Logging;

namespace FunctionApp170
{
    public class Function1
    {
        private readonly ServiceBusClient rith_sbc;
        private readonly ILogger<Function1> r_log;
        public Function1(ServiceBusClient sbc, ILogger<Function1> logger)
        {
            rith_sbc = sbc;
            r_log = logger;
        }
        [FunctionName("Function1")]
        public async Task<IActionResult> Run(
            [HttpTrigger(AuthorizationLevel.Function, "get", "post", Route = null)] HttpRequest req)
        {
            r_log.LogInformation("Hello, code is started executing");
            var rith_obj = new { Message = "Secret", Time = DateTimeOffset.UtcNow };
            var mid = Guid.NewGuid().ToString();
            var r_sen = rith_sbc.CreateSender("rithqueue");
            var rith_msg = new ServiceBusMessage(JsonSerializer.Serialize(rith_obj))
            {
                MessageId = mid
            };
            await r_sen.SendMessageAsync(rith_msg);
            return new OkObjectResult("Hello Rithwik Bojja, Message is sent to Service Bus Successfully"); 
        }
    }
}

Startup.cs:

using Azure.Messaging.ServiceBus;
using Azure.Messaging.ServiceBus.Administration;
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Extensions.DependencyInjection;
using System;
using System.Threading.Tasks;

[assembly: FunctionsStartup(typeof(FunctionApp170.Startup))]

namespace FunctionApp170
{
    public class Startup : FunctionsStartup
    {
        public override void Configure(IFunctionsHostBuilder builder)
        {
            string rith_cs = Environment.GetEnvironmentVariable("Rith_Service_Bus_Con");
            var rith_ac = new ServiceBusAdministrationClient(rith_cs);
            ConfigureQueueAsync(rith_ac).GetAwaiter().GetResult();
            builder.Services.AddSingleton(new ServiceBusClient(rith_cs));
            builder.Services.AddSingleton<Function1>();
        }
        private async Task ConfigureQueueAsync(ServiceBusAdministrationClient r_ac)
        {
            var r_q_name = "rithqueue";
            if (!await r_ac.QueueExistsAsync(r_q_name))
            {
                var rith_op = new CreateQueueOptions(r_q_name)
                {
                    RequiresDuplicateDetection = true,
                    DuplicateDetectionHistoryTimeWindow = TimeSpan.FromMinutes(10)
                };
                await r_ac.CreateQueueAsync(rith_op);
            }
        }
    }
}

local.settings.json:

{
    "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "UseDevelopmentStorage=true",
    "FUNCTIONS_WORKER_RUNTIME": "dotnet",
    "Rith_Service_Bus_Con": "Endpoint=sb://rithwik.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=eu6XXSQ="
  }
}

输出:

enter image description here

enter image description here

使用上面的代码不会发生两次排队。

© www.soinside.com 2019 - 2024. All rights reserved.