多次传递持久功能的队列触发消息

问题描述 投票:0回答:1

我有一个由队列触发器调用的持久函数

这个持久函数需要时间来运行,我认为这意味着消息会多次从队列重新发送到函数

public async Task RunOrchestratorAsync(
[ServiceBusTrigger("my-queue", Connection = "event-bus-connection")] string queueItem,
            [DurableClient] IDurableOrchestrationClient starter)
{
    var dto = JsonConvert.DeserializeObject<MyObject>(queueItem);
    await starter.StartNewAsync("OrchestratorFunction", dto);
}

如您所见,消息有多次传递

我怎样才能阻止这个?

保罗

azure azure-functions azureservicebus azure-durable-functions
1个回答
0
投票

我同意@Stephen Cleary

如果发送消息未被函数处理并且函数由于某些错误或问题而退出。 当下次函数成功运行时,它将处理所有未处理的消息。

我还创建了一个持久函数,触发器是服务总线队列触发器, 并使用了您在问题中提供的代码。

function1.cs
:

    using System.Collections.Generic;
    using System.Threading.Tasks;
    using Microsoft.Azure.WebJobs;
    using Microsoft.Azure.WebJobs.Extensions.DurableTask;
    using Microsoft.Extensions.Logging;
    using Newtonsoft.Json;
    using Newtonsoft.Json.Linq;
    using System;
    
    
    namespace FunctionApp1
    {
        public class MyObject
        { 
            public string Id { get; set; }
        }
        public static class Function1
        {
            [FunctionName("QueueTrigger")]
            public static void Run(
                [ServiceBusTrigger("test", Connection = "connectionstring")] string myQueueItem,
                [DurableClient] IDurableOrchestrationClient starter,
                ILogger log)
            {
                try
                {
                    var dto = JsonConvert.DeserializeObject<MyObject>(myQueueItem);
                    string instanceId = starter.StartNewAsync("Orchestration", dto).Result;
    
                    log.LogInformation("Started orchestration with ID = '{instanceId}'.", instanceId);
                }
                catch (Exception ex)
                {
                    log.LogError($"Error processing message: {ex.Message}");
                }
            }
    
    
            [FunctionName("Orchestration")]
            public static async Task<List<string>> RunOrchestrator(
                [OrchestrationTrigger] IDurableOrchestrationContext context)
            {
                MyObject myQueueItem = context.GetInput<MyObject>();
    
                var outputs = new List<string>();
    
                outputs.Add(await context.CallActivityAsync<string>(nameof(SayHello),myQueueItem.Id));
    
                return outputs;
            }
    
            [FunctionName(nameof(SayHello))]
            public static void SayHello([ActivityTrigger] string name, ILogger log)
            {
                log.LogInformation("Saying hello to {name}.", name);
            }
    
            
        }
    }

local.settings.json
:

    {
      "IsEncrypted": false,
      "Values": {
        "AzureWebJobsStorage": "UseDevelopmentStorage=true",
        "FUNCTIONS_WORKER_RUNTIME": "dotnet",
        "connectionstring": "xxxxxxxxxxxxx"
      }
    }

如您所见,我只收到一次消息。

enter image description here

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.