我正在从 azure-event-hubs-go/v3 迁移到较新的 azeventhubs Go SDK。在旧版 SDK 中,有一个 ReceiveOption 参数允许我指定从哪里开始消费事件。
在新的 SDK 中,我使用以下代码来初始化处理器:
processor, err := azeventhubs.NewProcessor(
e.ConsumerClient,
checkpointStore,
&azeventhubs.ProcessorOptions{
UpdateInterval: time.Second,
Prefetch: 0,
StartPositions: azeventhubs.StartPositions{
Default: azeventhubs.StartPosition{
Latest: to.Ptr(true),
EnqueuedTime: to.Ptr(time.Now()),
Inclusive: true
}
}
}
)
但是,我注意到事件是从最后一个检查点而不是最近发送的事件中消耗的。
我尝试过的: 我已经尝试过 ConsumingEventsUsingConsumerClient 和 ConsumingEventsWithCheckpoints 示例,但它们的行为方式相同,消耗来自最后一个检查点的事件而不是最近的事件。
我的期望: 我希望处理器开始使用从设备发送的最新事件,该设备每秒发送一条消息。如何使用 azeventhubs Go SDK 实现此行为?
我最初很难掌握 AMQP 的底层机制。不过,我很高兴地报告该问题已成功解决。
var wg sync.WaitGroup
wg.Add(1)
for _, partition := range p.PartitionIDs {
go func(partition string) {
defer wg.Done()
partitionClient, err := consumerClient.NewPartitionClient(partition, nil)
if err != nil {
panic(err)
}
receiveCtx, cancel := context.WithTimeout(context.TODO(), time.Second*30)
defer cancel()
for {
events, err := partitionClient.ReceiveEvents(receiveCtx, 1, nil)
if err != nil && !errors.Is(err, context.DeadlineExceeded) {
panic(err)
}
for _, evt := range events {
fmt.Printf("partition: %s\n", partition)
fmt.Printf("Body: %s\n", string(evt.Body))
}
}
}(partition)
}
wg.Wait()
我对 Azure 客户支持服务团队的宝贵帮助表示感谢。