我有一种情况,其中单个节点上的所有处理应在单一的“工作单位”是将提交一旦所有的处理程序被调用运行。我认为最好的办法是做以下:
当接收到消息时,执行这些操作作为管道的一部分:
你能给我暗示如何定制卤面管道,以满足上述要求?
我已经结束了与此:
private static IBus _bus;
public void ConfigureServices(IServiceCollection services)
{
services.AddDbContext<MyDbContext>();
services.AddMvc();
services.AddTransient<IBus>(sp => _bus);
}
public void Configure(IApplicationBuilder app, IHostingEnvironment env)
{
StartRebus(app);
...
}
public static void StartRebus(this IApplicationBuilder app)
{
var rebusServices = new ServiceCollection();
rebusServices.AutoRegisterHandlersFromAssemblyOf<ActivityDenormalizer>();
rebusServices.AddTransient<MyDbContext>(sp =>
{
var messageContext = MessageContext.Current
?? throw new InvalidOperationException("MessageContext.Current is null.");
return messageContext.TransactionContext.Items
.GetOrThrow<MyDbContext>(nameof(MyDbContext));
});
rebusServices.AddRebus((configure, sp) => configure
.Transport(t => t.UseInMemoryTransport(new InMemNetwork(), "Messages"))
.Options(o =>
{
o.EnableUnitOfWork<MyDbContext>(
unitOfWorkFactoryMethod: messageContext =>
{
//create new dbcontext instance regardless of ServiceLifeTime.
//Notice that I'm using ApplicationServices here, not RebusServices.
var dbContext = ActivatorUtilities.CreateInstance<MyDbContext>(app.ApplicationServices);
messageContext.TransactionContext.Items[nameof(MyDbContext)] = dbContext;
return dbContext;
},
commitAction: (messageContext, dbContext) => dbContext.SaveChanges(),
cleanupAction: (messageContext, dbContext) => dbContext.Dispose());
}));
var rebusServiceProvider = rebusServices.BuildServiceProvider();
rebusServiceProvider.UseRebus();
_bus = rebusServiceProvider.GetRequiredService<IBus>();
}
应用服务和画谜服务在两个地方都是相互关联的:
rebusServiceProvider
解决,但实例也被登记在应用服务,这样我可以从我的应用程序发送消息给它。unitOfWorkFactoryMethod
和rebusServices注册,以便它可以被注入到卤面处理程序。是 - 检查Rebus.UnitOfWork - 它拥有你需要的钩子。
具体来说,实体框架,你会做这样的事情:
Configure.With(new CastleWindsorContainerAdapter(container))
.Transport(t => t.UseMsmq("test"))
.Options(o => o.EnableUnitOfWork(
async context => new CustomUnitOfWork(context, connectionString),
commitAction: async (context, uow) => await uow.Commit()
))
.Start();
其中CustomUnitOfWork
会是这个样子:
class CustomUnitOfWork
{
public const string ItemsKey = "current-db-context";
readonly MyDbContext _dbContext;
public CustomUnitOfWork(IMessageContext messageContext, string connectionString)
{
_dbContext = new MyDbContext(connectionString);
messageContext.TransactionContext.Items[ItemsKey] = this;
}
public async Task Commit()
{
await _dbContext.SaveChangesAsync();
}
public MyDbContext GetDbContext() => _dbContext;
}
然后你会设置你的IoC容器从当前消息上下文获取它来解决MyDbContext
- 与温莎城堡,将这样进行:
container.Register(
Component.For<CustomUnitOfWork>()
.UsingFactoryMethod(k =>
{
var messageContext = MessageContext.Current
?? throw new InvalidOperationException("Can't inject uow outside of Rebus handler");
return messageContext.TransactionContext.Items
.GetOrThrow<CustomUnitOfWork>(CustomUnitOfWork.ItemsKey);
})
.LifestyleTransient(),
Component.For<MyDbContext>()
.UsingFactoryMethod(k => k.Resolve<CustomUnitOfWork>().GetDbContext())
.LifestyleTransient()
);