我发现有两种接收EventHub消息数据的方式:
public class SimpleEventProcessor : IEventProcessor
{
public Task CloseAsync(PartitionContext context, CloseReason reason)
{
Console.WriteLine($"Processor Shutting Down. Partition '{context.PartitionId}', Reason: '{reason}'.");
return Task.CompletedTask;
}
public Task OpenAsync(PartitionContext context)
{
Console.WriteLine($"SimpleEventProcessor initialized. Partition: '{context.PartitionId}'");
return Task.CompletedTask;
}
public Task ProcessErrorAsync(PartitionContext context, Exception error)
{
Console.WriteLine($"Error on Partition: {context.PartitionId}, Error: {error.Message}");
return Task.CompletedTask;
}
public Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> messages)
{
foreach (var eventData in messages)
{
var data = Encoding.UTF8.GetString(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);
Console.WriteLine($"Message received. Partition: '{context.PartitionId}', Data: '{data}'");
}
return context.CheckpointAsync();
}
}
EventHubClient eventHub
var reciever = eventHub.CreateReceiver("consumer1", "0", EventPosition.FromStart());
var recieved = await reciever.ReceiveAsync(10);
对他们有什么区别?我们可以将检查点保存为第二种方式吗?如何以第二种方式处理撞车事故?为什么需要两种不同的方式?
EventHubClient又名。低级API用于构建连接器。在这种情况下,开发人员负责管理分区接收器,检查点,负载分配和崩溃恢复等。大多数人将不会使用此API进行接收,并且该API也是用于构建源到接收器的连接器。
Processor Host带有内置的检查点,负载分配和分区接收器管理器。当实现IEventProcessor并提供存储检查点存储时,此API可能看起来像是一种矫kill过正,但是从长远来看,它更加无忧。