我正在使用Microsoft.Azure.EventHubs.Processor
消耗由32个分区并行运行的EventHub,最多有4个在不同服务实例中运行的主机。
虽然所有4个服务都已启动并运行,但它们开始一遍又一遍地窃取分区租约。最终导致事件被一次又一次地处理。
我的IEventProcessor
当前看起来是这样的:
public class BaseEventProcessor : IEventProcessor
{
readonly TimeSpan DefaultCheckpointInterval = TimeSpan.FromMinutes(1);
readonly TimeSpan _checkpointInterval;
Stopwatch _checkpointStopWatch;
public BaseEventProcessor(TimeSpan? checkpointInterval = null)
{
_checkpointInterval = checkpointInterval ?? DefaultCheckpointInterval;
}
public virtual Task ProcessErrorAsync(PartitionContext context, Exception error)
{
// some log code
return Task.CompletedTask;
}
public virtual async Task CloseAsync(PartitionContext context, CloseReason reason)
{
// some log code
if (reason == CloseReason.Shutdown)
{
await context.CheckpointAsync();
}
}
public virtual Task OpenAsync(PartitionContext context)
{
// some log code
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();
return Task.CompletedTask;
}
public virtual async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
// some processing code
if (messages.Count() > 0 && _checkpointStopWatch.Elapsed >= _checkpointInterval)
{
await context.CheckpointAsync();
_checkpointStopWatch.Restart();
}
}
}
处理器使用此选项初始化:
new EventProcessorOptions
{
PrefetchCount = 200,
MaxBatchSize = 100,
InitialOffsetProvider = (partitionId) => EventPosition.FromEnd(),
InvokeProcessorAfterReceiveTimeout = true,
ReceiveTimeout = TimeSpan.FromSeconds(30),
EnableReceiverRuntimeMetric = true
}
以及具有此选项的分区管理器:
new PartitionManagerOptions
{
RenewInterval = TimeSpan.FromSeconds(10),
LeaseDuration = TimeSpan.FromSeconds(60)
}
此错误在ProcessError
方法内引发:
指定的租约ID与Blob的租约ID不匹配。
-
具有较高的'14'的接收者'dadf82a9-d27a-4af6-b482-5158c23bebe0'已经存在。无法创建带有纪元11的接收器'65bc9d06-c09b-4ab5-af62-75e05ecaa88a'。请确保您创建的接收器的纪元值不断增加,以确保连通性,或者确保所有旧的纪元接收器都已关闭或断开连接
-
具有更高的'12'的新接收器'ed4fbbcd-5896-40d2-adc9-55feb77f6564'已创建,因此当前与'11'的接收器'65bc9d06-c09b-4ab5-af62-75e05ecaa88a'已断开连接。如果要重新创建接收器,请确保使用更高的纪元。
我在这里做错了什么?我需要微调选项还是需要对错误做出某种反应?
谢谢,这是对您问题的答复:
要检查的东西:
确保同时记录来自IProcessorHost错误处理程序和EventProcessorOptions错误处理程序的错误。
查看客户端是否正在记录任何存储例外。这对于确定是否因存储I / O故障而导致丢失租约很重要。
检查客户端资源利用率,例如CPU,线程剥夺,可用内存等。资源利用率高会延迟任务调度,从而导致I / O超时。]
请确保没有其他接收者正在使用同一使用者组。请注意,每个分区一次只能有一个纪元接收器。每个主机组应从专用的消费者组接收。如果2个主机组开始争夺同一使用者组,则接收方可能会观察到ReceiverDisconnectedExceptions。
每个消费者组专用一个存储帐户。基本上不与任何其他服务或使用者共享存储帐户。由于考虑到存储限制,因此建议这样做。如果其他一些服务情况导致高存储I / O从而节流,这也可能会影响租赁操作。
确保已禁用存储帐户的软删除。