在 .NET Core 8 Web API 应用程序中实现服务总线的通用使用者时,我遇到了一个问题:在尝试解析使用者内的依赖项时,发现 IServiceProvider 实例已被释放或为 null。如何确保 IServiceProvider 在使用者中保持有效且可访问,以正确解决依赖项(例如事件处理器策略)而不被释放?
背景: 我想要实现的是一种可重用且通用的方法,用于跨多个微服务订阅服务总线主题。每个微服务应该能够轻松实现这个通用类,并提供自己的策略来处理接收到的事件。
我使用的是HostedService,特别是PostConsumerHostedService,它负责触发消费者逻辑。
尝试在消费者中使用 IServiceProvider 时会出现问题,因为它被发现在运行时被释放。
首先,这是触发消费者的PostConsumerHostedService:
public class PostConsumerHostedService : CronJobServiceBase
{
private readonly IServiceProvider _service;
public PostConsumerHostedService(
IOptions<PostConsumerHostedServiceSettings> postConsumerHostedServiceSettings,
ILogger<CronJobServiceBase> log,
IServiceProvider service)
: base(postConsumerHostedServiceSettings, log)
{
_service = service;
}
protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
{
using var scope = _service.CreateAsyncScope();
var postTaskService = scope.ServiceProvider.GetRequiredService<IPostConsumerTaskService>();
await postTaskService.StartAsync(cancellationToken);
}
}
接下来,PostConsumerTaskService 调用 IServiceBusConsumer:
public interface IPostConsumerTaskService : ITaskService, IHandleServiceBusMessage
{
}
public class PostConsumerTaskService : IPostConsumerTaskService
{
private readonly IServiceBusConsumer _serviceBusConsumer;
private readonly IServiceProvider _serviceProvider;
public PostConsumerTaskService(
IServiceBusConsumer serviceBusConsumer,
IServiceProvider serviceProvider)
{
_serviceBusConsumer = serviceBusConsumer;
_serviceProvider = serviceProvider;
}
public async Task HandleStringMessageAsync(string message)
{
var jsonDocument = JsonDocument.Parse(message);
// Extract the value of the "type" field
if (jsonDocument.RootElement.TryGetProperty("EventType", out JsonElement typeElement))
{
string type = typeElement.GetString();
Console.WriteLine("Type: " + type);
Enum.TryParse(type, out EventType myEvent);
var eventStrategy = await _eventProcessorFactory.GetEventProcessorStrategy(_serviceProvider,myEvent);
eventStrategy.ProcessEvent(message);
}
else
{
Console.WriteLine("Type field not found in JSON.");
}
await Task.CompletedTask;
}
public async Task StartAsync(CancellationToken cancellationToken)
{
await _serviceBusConsumer.StartAsync<LikeCreated>(this, cancellationToken);
}
}
这是 ServiceBusConsumerBase,在尝试检索事件处理器策略时会出现问题:
public abstract class ServiceBusConsumerBase: IServiceBusConsumer
{
private readonly ServiceBusConsumerSettingsBase _serviceBusConsummerSettings;
public ServiceBusConsumerBase(IOptions<ServiceBusConsumerSettingsBase> serviceBusConsummerSettings)
{
_serviceBusConsummerSettings = serviceBusConsummerSettings.Value;
}
public async Task StartAsync<T>(IHandleServiceBusMessage handleServiceBusMessage,CancellationToken cancellationToken)
{
var serviceBusClient = new ServiceBusClient(_serviceBusConsummerSettings.ConnectionString);
var processor = serviceBusClient.CreateProcessor(_serviceBusConsummerSettings.Topic, _serviceBusConsummerSettings.Subscription, new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1
});
// handle received messages
//processor.ProcessMessageAsync += args => MessageHandler<T>(args,handleServiceBusMessage);
processor.ProcessMessageAsync += args => MessageStringHandler(args, handleServiceBusMessage);
processor.ProcessErrorAsync += ErrorHandler;
// Start processing
await processor.StartProcessingAsync(cancellationToken);
Console.WriteLine("Task running");
}
// handle received messages
async Task MessageStringHandler(ProcessMessageEventArgs args, IHandleServiceBusMessage handleServiceBusMessage)
{
var body = args.Message.Body.ToString();
Console.WriteLine($"Received: imageFileName {body}");
await handleServiceBusMessage.HandleStringMessageAsync(body);
// complete the message. messages are deleted from the subscription.
await args.CompleteMessageAsync(args.Message);
}
// handle any errors when receiving messages
Task ErrorHandler(ProcessErrorEventArgs args)
{
Console.WriteLine(args.Exception.ToString());
return Task.CompletedTask;
}
}
错误:
System.ObjectDisposedException: Cannot access a disposed object.
Object name: 'IServiceProvider'.
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ThrowHelper.ThrowObjectDisposedException()
at Microsoft.Extensions.DependencyInjection.ServiceLookup.ServiceProviderEngineScope.GetService(Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService(IServiceProvider provider, Type serviceType)
at Microsoft.Extensions.DependencyInjection.ServiceProviderServiceExtensions.GetRequiredService[T](IServiceProvider provider)
at PostService.Factories.EventProcessorFactory.GetEventProcessorStrategy(IServiceProvider serviceProvider, EventType eventType) in /Users/dmata/Projects/InstagramSD/PostService/Factories/EventProcessorFactory.cs:line 31
at PostService.TaskServices.LikeTopicConsumerTaskService.HandleStringMessageAsync(String message) in /Users/dmata/Projects/InstagramSD/PostService/TaskServices/LikeTopicConsumerTaskService.cs:line 54
at Common.ServiceBus.ServiceBusConsumerBase.MessageStringHandler(ProcessMessageEventArgs args, IHandleServiceBusMessage handleServiceBusMessage) in /Users/dmata/Projects/InstagramSD/Common/ServiceBus/ServiceBusConsumerBase.cs:line 59
at Azure.Messaging.ServiceBus.ServiceBusProcessor.OnProcessMessageAsync(ProcessMessageEventArgs args)
at Azure.Messaging.ServiceBus.ReceiverManager.OnMessageHandler(EventArgs args)
当您最初在此处“开始”操作时:
protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
{
using var scope = _service.CreateAsyncScope();
var postTaskService = scope.ServiceProvider.GetRequiredService<IPostConsumerTaskService>();
await postTaskService.StartAsync(cancellationToken);
}
注意如何调用“Start”,然后处理
scope
变量。问题在于,实际的处理程序是在所有这些之后执行的,并且它们尝试再次依赖 IServiceProvider
。该提供程序已与 scope
一起处理,因此您不能这样做。
解决此问题的一种方法是not在那里处理掉
scope
,也许只是将其存储为字段并在您的类上实现IDisposable
以便稍后处理范围。
例如,这样的东西可能会起作用。
public class PostConsumerHostedService : CronJobServiceBase, IAsyncDisposable
{
private readonly IServiceProvider _service;
private AsyncServiceScope _scope;
public PostConsumerHostedService(
IOptions<PostConsumerHostedServiceSettings> postConsumerHostedServiceSettings,
ILogger<CronJobServiceBase> log,
IServiceProvider service)
: base(postConsumerHostedServiceSettings, log)
{
_service = service;
}
public async ValueTask DisposeAsync()
{
await this.scope?.DisposeAsync();
}
protected override async Task ExecuteTaskAsync(CancellationToken cancellationToken)
{
this.scope = _service.CreateAsyncScope();
var postTaskService = scope.ServiceProvider.GetRequiredService<IPostConsumerTaskService>();
await postTaskService.StartAsync(cancellationToken);
}
}
请记住,此设计有一个缺陷,如果您决定在同一实例上多次调用
ExecuteTaskAsync
,则会出现该缺陷,因为范围将被覆盖。
话虽如此,完全解决此问题的最干净方法可能是确保您在首次解决IServiceProvider
后
不要尝试再次接触容器(使用
postTaskService
):问题将也这样走开。
现在我要说的是:我认为你应该重新考虑你的设计。与在托管服务中创建作用域不同,为每条需要处理的消息创建一个作用域是一种更好的设计:当消息到达时,从主
IServiceProvider
(或者只是注入IServiceScopeFactory
)为其创建一个作用域。从此作用域中创建消息处理器,然后让它处理消息。完成此操作后,您可以丢弃示波器。
此设计更好地模拟了现有模式,您可以在其中创建“每个请求的范围”。例如,它与 AspNetCore 中发生的情况类似,但您处理的不是 HTTP 请求,而是服务总线请求。
每条消息一个范围还可以避免诸如保留实例时间过长或无意间在处理程序之间共享实例等问题。例如,如果您要使用 EFCore 中的单个
DbContext
,您将开始看到许多问题,例如高内存使用率和并发问题。在处理这样的类时,最好每个“处理单元”都有一个实例。