我正在努力使用在 Azure Function Http 端点中执行的 Azure ServiceBusSender。 大约一天左右效果很好。然后突然会抛出以下错误,我将不得不重新启动azure功能:
System.Exception: A connection attempt failed because the connected party did not properly respond after a period of time, or established connection failed because connected host has failed to respond. ErrorCode: TimedOut (ServiceCommunicationProblem). For troubleshooting information, see https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot.
如果我重新启动azure功能,它将继续工作大约一天。
利用 Azure.Messaging.ServiceBus 7.16.1
依赖注入:
builder.Services.AddAzureClients(clientsBuilder =>
{
clientsBuilder.AddServiceBusClient(appConfig.GetValue<string>("CRMSBActivityConnectionString"))
.ConfigureOptions(options =>
{
options.RetryOptions.Delay = TimeSpan.FromMilliseconds(50);
options.RetryOptions.MaxDelay = TimeSpan.FromSeconds(5);
options.RetryOptions.MaxRetries = 3;
});
clientsBuilder.AddClient<ServiceBusSender, ServiceBusClientOptions>((_, _, provider) =>
provider.GetService<ServiceBusClient>().CreateSender(appConfig.GetValue<string>("CRMSBActivityTopicName")
)).WithName("SBSender");
});
HTTP 端点:
private IServiceLogger _logger;
private const string _commonMessage = "SMSActivityStopGetHttpListenerV1:";
private readonly ServiceBusSender _sender;
public SMSActivityStopGetHttpListener(IServiceLogger logger, IAzureClientFactory<ServiceBusSender> serviceBusSenderFactory)
{
_logger = logger;
_sender = serviceBusSenderFactory.CreateClient("SBSender");
}
[ProducesResponseType((int)HttpStatusCode.Accepted)]
[ProducesResponseType((int)HttpStatusCode.InternalServerError)]
[ProducesResponseType((int)HttpStatusCode.BadRequest)]
[FunctionName("SMSActivityStopGetHttpListenerV1")]
public async Task<HttpResponseMessage> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", Route = "v1/smsactivity/stop")] HttpRequest req,
ILogger log)
{
//Simple logic
try
{
await _sender.SendMessageAsync(servicebusMessage);
}
catch (Exception ex)
{
response = new HttpResponseMessage(HttpStatusCode.InternalServerError) { Content = new StringContent(errorMessage, Encoding.UTF8, "application/json") };
SplunkLogger.LogExceptionToSplunk(_logger, _commonMessage, null, correlationId, "SMSActivityStop API failed", ex.Message);
}
return response;
}
尝试更新软件包。从函数中删除了各种额外的细节。但问题仍然存在。 即使请求很少,一天后也会停止工作。不到二十。
我在我的环境中尝试了你的代码,并得到了下面相同的错误。
我对您的代码进行了一些更改,并收到了发送到 Azure 门户中服务总线队列的消息。
代码:
using System;
using System.Net;
using System.Net.Http;
using System.Text;
using Microsoft.Azure.Functions.Extensions.DependencyInjection;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using Azure.Messaging.ServiceBus;
using Microsoft.Azure.WebJobs;
using Microsoft.Azure.WebJobs.Extensions.Http;
using Microsoft.Azure.WebJobs.ServiceBus;
using System.Threading.Tasks;
[assembly: FunctionsStartup(typeof(ServiceBusFunc.Startup))]
namespace ServiceBusFunc
{
public class Startup : FunctionsStartup
{
public override void ConfigureAppConfiguration(IFunctionsConfigurationBuilder builder)
{
builder.ConfigurationBuilder
.SetBasePath(Environment.CurrentDirectory)
.AddJsonFile("local.settings.json", optional: true, reloadOnChange: true)
.AddEnvironmentVariables()
.Build();
}
public override void Configure(IFunctionsHostBuilder builder)
{
IConfiguration config = builder.GetContext().Configuration;
builder.Services.AddSingleton(config);
string serviceBusConnectionString = config.GetValue<string>("ServiceBusConnectionString");
builder.Services.AddSingleton(x => new ServiceBusClient(serviceBusConnectionString));
builder.Services.AddSingleton<ServiceBusSender>(x =>
{
var client = x.GetRequiredService<ServiceBusClient>();
string queueName = config.GetValue<string>("QueueName");
return client.CreateSender(queueName);
});
}
}
public static class SMSActivityStopGetHttpListener
{
[FunctionName("SMSActivityStopGetHttpListenerV1")]
public static async Task<HttpResponseMessage> Run(
[HttpTrigger(AuthorizationLevel.Function, "get", Route = "v1/smsactivity/stop")] HttpRequestMessage req,
[ServiceBus("<your_queue_name>", Connection = "ServiceBusConnectionString")] IAsyncCollector<ServiceBusMessage> outputMessages,
ILogger log)
{
string messageContent = "hello kamali, how are you?"; // Replace with your message content
try
{
var message = new ServiceBusMessage(messageContent);
var sender = outputMessages as IAsyncCollector<ServiceBusMessage>;
int maxRetries = 3;
TimeSpan delayBetweenRetries = TimeSpan.FromSeconds(5);
for (int retryCount = 0; retryCount < maxRetries; retryCount++)
{
try
{
await sender.AddAsync(message);
return new HttpResponseMessage(HttpStatusCode.Accepted);
}
catch (ServiceBusException ex) when (ex.Reason == ServiceBusFailureReason.ServiceCommunicationProblem)
{
log.LogWarning($"Service Bus communication problem. Retrying in {delayBetweenRetries.TotalSeconds} seconds.");
await Task.Delay(delayBetweenRetries);
}
catch (Exception ex)
{
log.LogError(ex, "Error sending message to Service Bus");
return new HttpResponseMessage(HttpStatusCode.InternalServerError)
{
Content = new StringContent("Internal Server Error", Encoding.UTF8, "application/json")
};
}
}
log.LogError("Failed to send message after multiple retries.");
return new HttpResponseMessage(HttpStatusCode.InternalServerError)
{
Content = new StringContent("Internal Server Error - Message delivery failed", Encoding.UTF8, "application/json")
};
}
catch (Exception ex)
{
log.LogError(ex, "Error processing message");
return new HttpResponseMessage(HttpStatusCode.InternalServerError)
{
Content = new StringContent("Internal Server Error", Encoding.UTF8, "application/json")
};
}
}
}
}
local.settings.json:
{
"IsEncrypted": false,
"Values": {
"AzureWebJobsStorage": "UseDevelopmentStorage=true",
"FUNCTIONS_WORKER_RUNTIME": "dotnet",
"ServiceBusConnectionString": "<ServiceBus_Queue_Connection_String>",
"QueueName": "<queue_name>"
}
}
注意:在运行上述代码之前,请确保您有稳定的互联网连接。
输出:
它运行成功并向服务总线队列发送消息。
以下是浏览器的输出:
Azure 门户:
我在 Azure 门户的服务总线队列中收到消息,如下所示:
所有发送者和接收者都可以使用注入函数
IAzureClientFactory<ServicBusCient>
工厂对象来创建。该对象充当单例,以避免重新建立与命名空间的连接,因为这是一项昂贵的操作。发送方和接收方使用由 ServiceBusClient
维护的连接,并且按需创建和处置的成本低廉。