我有一个安装在3台不同服务器中的应用程序。此应用程序订阅一个Event Hub。此Event Hub为8个分区。因此,当我在所有3台计算机上启动应用程序时,所有3台计算机上的所有分区都是随机初始化的。
VM1:分区0,1,2VM2:分区3,4VM3:分区5,6,7
所有这些分区都在连续接收消息。这些消息需要一个接一个地处理。现在,我的要求是,对于一台机器/服务器,我一次只希望接收一条消息(无论初始化了多少个分区)。VM1,VM2,VM3也可以并行运行(请让我知道这句话是否令人困惑)。
一种情况是,在一台机器上,例如VM1,我已经通过分区0收到了一条消息。该消息现在正在处理,通常需要15分钟。在这15分钟内,我不希望分区1或分区2接收任何新消息,直到较早的消息完成。一旦完成了先前的消息处理,则3个分区中的任何一个都准备好接收新消息。一旦任何分区收到另一条消息,其他分区就不应接收任何消息。
我正在使用的代码是这样的:
public class SimpleEventProcessor : IEventProcessor { public Task CloseAsync(PartitionContext context, CloseReason reason) { Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'."); return Task.CompletedTask; } public Task OpenAsync(PartitionContext context) { Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'"); return Task.CompletedTask; } public Task ProcessErrorAsync(PartitionContext context, Exception error) { Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}"); return Task.CompletedTask; } public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages) { foreach (var eventData in messages) { var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count); Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'"); DoSomethingWithMessage(); // typically takes 15-20 mins to finish this method. } return context.CheckpointAsync(); } }
请让我知道是否有可能。如果是,那怎么办?非常感谢在这方面的任何帮助。
PS:我必须使用事件中心,没有其他选择。
我有一个安装在3台不同服务器中的应用程序。此应用程序订阅一个Event Hub。此Event Hub为8个分区。因此,当我在所有3台计算机上启动应用程序时,...
您可以通过互斥静态锁定对象来实现。