我有以下不同情况:
场景1
RabbitMQ已启动。
消费者启动并通过MassTransit正确订阅RabbitMQ。
RabbitMQ下降。
RabbitMQ再次上升。
如果Consumer尝试使用busControl.StartAsync();进行订阅,则它将不再重新连接
如果消费者尝试使用busControl.Start();进行订阅,则它将正确地重新连接。
已经发现这个有点奇怪了。但是还不错。
场景2
RabbitMQ已关闭
消费者上升并尝试通过busControl.Start();进行订阅,由于RabbitMQ关闭,该订阅将失败
RabbitMQ上升
消费者无法重新连接到RabbitMQ
但是我需要以某种方式解决这个问题。
注册:
private static void RegisterRabbitMQ(SimpleInjectorContainer container)
{
container.AddMassTransit(x =>
{
x.AddBus(() => Bus.Factory.CreateUsingRabbitMq(cfg =>
{
var host = cfg.Host(ConfigurationValuesProvider.Current.Get("RabbitMQHostName"), hostConfigurator =>
{
#if !DEBUG
hostConfigurator.Username(ConfigurationValuesProvider.Current.Get("RabbitMQUsername"));
hostConfigurator.Password(ConfigurationValuesProvider.Current.Get("RabbitMQPassword"));
hostConfigurator.UseCluster(c =>
{
string[] hostnames = ConfigurationValuesProvider.Current.Get("RabbitMQNodes").Split(';');
c.ClusterMembers = hostnames;
});
#endif
});
}));
});
}
出发巴士:
try
{
RegisterRabbitMQ(container);
var busControl = container.GetInstance<IBusControl>();
busControl.Start();
}
catch (Exception ex)
{
Log.Seq.Error("RabbitMQ broker is not reachable, BusControl cannot be started. ", ex);
}
提前感谢!
我们已经在生产中使用大众运输已有一段时间了。一旦公交连接到Rabbit,公交的连接管理就非常牢固。我认为这是VM的/容器的启动和停止,而当您的消费者服务出现时兔子掉下来是真正的问题。
我们解决此问题的方法是将使用者的启动移至服务的其他线程上并实施重试操作。这将使您的服务成功启动并进入循环,等待直到Rabbit出现。重要的是,我们记录该服务无法连接到Rabbit,因此我们对此具有可见性。
代码
private async Task StartAsync()
{
await Task.Run(() => {
bool isConnected = false;
while(!isConnected)
{
try
{
ServiceBus = CreateServiceBus();
ServiceBus?.Start();
isConnected = true;
_loggingService.Info("Messaging service bus started.");
}
catch(Exception ex)
{
_loggingService.Error("Cannot connect to message broker host.", ex);
Thread.Sleep(Defined_Time);
}
}
});
}
然后按照您在演示中看到的完全配置您的总线
private IBusControl CreateServiceBus()
{
var url = string.Format(CultureInfo.InvariantCulture, "rabbitmq://{0}", _messagingServerName);
var uri = new Uri(url);
_loggingService.DebugFormat("Starting messaging service bus: \"{0}\". Username=\"{1}\".", url, _messagingUsername);
return Bus.Factory.CreateUsingRabbitMq(sbc =>
{
Host = sbc.Host(uri, h =>
{
h.Username(_messagingUsername);
h.Password(_messagingPassword);
});
//Configure end points
});
}
希望有所帮助。