闲置时在 Azure.Messaging.EventHubs.EventProcessorClient 上更新 EventHub Partition Offsett 检查点。

问题描述 投票:0回答:1

在我的方案中,我会有成批的事件同时到来,然后长时间的事件Hub会处于空闲状态。在我的处理器客户端中,我希望每隔 N 个事件或 N 分钟检查一次(以先到者为准)。

以下是我如何设置我的Azure.Messaging.EventHubs.EventProcessorClient。

EventProcessorClient processor = new EventProcessorClient(storageClient, consumerGroup, ehubNamespaceConnectionString, eventHubName);
processor.ProcessEventAsync += ProcessEventHandler;
processor.ProcessErrorAsync += ProcessErrorHandler;

//Start Stopwatch
_checkpointStopWatch = new Stopwatch();
_checkpointStopWatch.Start();

// Start the processing
await processor.StartProcessingAsync();

while (true)
{
    await Task.Delay(TimeSpan.FromSeconds(10));
    Console.WriteLine($"{eventsProcessed} events have been processed");
}

在我的ProcessEventHandler中,我对eventsProcessedSinceLastCheckpoint以及秒表上经过的时间进行检查。当其中一个达到最大值时,我就会重置这两个值,并在控制台窗口中记录下来。

static async Task<Task> ProcessEventHandler(ProcessEventArgs eventArgs)
{
   ++eventsProcessed;
   ++eventsProcessedSinceLastCheckpoint;

   Console.WriteLine("\tReceived event: {0}", Encoding.UTF8.GetString(eventArgs.Data.Body.ToArray()));

    // After every 100 events or 2 minutes we add a checkpoint. Whichever occurs first
    if(eventsProcessedSinceLastCheckpoint >= 100 || _checkpointStopWatch.Elapsed > TimeSpan.FromMinutes(2))
    {
        eventsProcessedSinceLastCheckpoint = 0;
        _checkpointStopWatch.Restart();

        await eventArgs.UpdateCheckpointAsync();
        Console.WriteLine("> Checkpoint Set. Count Reset. Stopwatch Reset.");
    }
    return Task.CompletedTask;

}

对eventsProcessedSinceLastCheckpoint变量的检查工作非常完美,因为每当有新事件发生时,ProcessEventHandler就会被启动。然而,当EventHub处于空闲状态时,ProcessEventHandler不会被调用,因此在EventHub安静了许多分钟或几个小时的情况下,我将永远不会对经过的时间进行检查点。

我明白我可以直接删除定时器,而且如果在检查点之间发生崩溃,我的处理器应该能够处理重复的事件。但在我的方案中(因为我有很长的空闲时间),我想利用我所拥有的时间,并在可以的情况下赶上,以避免额外的重复事件。因此增加了定时器作为空闲期间的后备措施。

我的问题是。 我怎样才能调用 更新CheckpointAsync() 在...之外 流程事件处理程序? 该方法似乎只存在于 ProcessEventArgs. 我不能直接在EventProcessorClient上调用它,这是最理想的,因为我可以将定时器检查移到ProcessEventHandler之外,并进入我的while循环......。

c# azure azure-eventhub checkpointing
1个回答
0
投票

设置 EventHubProcessorClientOptions.MaximumWaitTime 属性将允许你的处理器在没有读取事件时被调用。 当设置为非空时,等待时间基本上意味着 "一旦你得到事件就给我,但如果在这个时间间隔内没有读取事件,就ping我的处理程序"。

关于这种情况下的更新检查点,推荐的方法是缓存最后一个被派发到处理程序的事件的参数,然后用它来调用 UpdateCheckpointAsync. 该样本 演示了该方法,确保它在对分区的处理停止时创建一个检查点。

© www.soinside.com 2019 - 2024. All rights reserved.