目前有没有办法订阅整个存储帐户(多容器)?
我想将我的事件中心使用者检查点到多个容器或整个存储帐户,我的用例是我正在与多个容器交互,并且事件是通过它们分派的,目前,我已经阅读了文档和示例只能订阅一个容器。 这是我当前的实现,其中侦听一个容器,当元数据更改时,消费者会选择事件。
sharedCredential, errAzCred := container.NewSharedKeyCredential(ae.AccountName, ae.AccountKey)
if errAzCred != nil {
log.WithError(errAzCred).Error("Creating sharedCredential failed")
return errAzCred
}
checkClient, errAzClient := container.NewClientWithSharedKeyCredential(ae.ContanerUrl, sharedCredential, nil)
if errAzClient != nil {
log.WithError(errAzClient).Error("Creating container client failed")
return errAzClient
}
checkpointStore, errCheckPt := checkpoints.NewBlobStore(checkClient, nil)
if errCheckPt != nil {
log.WithError(errCheckPt).Error("Creating checkpoint store failed")
return errCheckPt
}
consumerClient, errClient := azeventhubs.NewConsumerClientFromConnectionString(ae.HubNamespaceConnectionString, ae.EventHubName, azeventhubs.DefaultConsumerGroup, nil)
if errClient != nil {
log.WithError(errClient).Error("Creating event hub consumer failed")
return errClient
}
defer consumerClient.Close(context.TODO())
processor, errProcessor := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
if errProcessor != nil {
log.WithError(errProcessor).Error("Creating event hub processor failed")
return errProcessor
}
我尝试了以下方法,它有效,但是当文件的元数据更改时,消费者不会选择该事件。
properties, err := consumerClient.GetEventHubProperties(context.TODO(), nil)
if err != nil {
return err
}
var wg sync.WaitGroup
for _, partitionID := range properties.PartitionIDs {
log.Debugf("start consuming partition %s", partitionID)
partitionClient, errPC := consumerClient.NewPartitionClient(partitionID, nil)
if errPC != nil {
log.WithError(errPC).Error("couldn't create a partition client")
return errPC
}
wg.Add(1)
go func() {
defer wg.Done()
if errProcess := ae.ProcessEvents(partitionClient); errProcess != nil {
log.WithError(errProcess).Error("failed to process the event")
}
}()
}
谢谢你
对于我们在 SDK 中提供的内容,简单的答案是“不”。
但是,如果您想构建自己的检查点商店,我们确实提供了自定义点。界面非常简单:
type CheckpointStore interface {
// ClaimOwnership attempts to claim ownership of the partitions in partitionOwnership and returns
// the actual partitions that were claimed.
ClaimOwnership(ctx context.Context, partitionOwnership []Ownership, options *ClaimOwnershipOptions) ([]Ownership, error)
// ListCheckpoints lists all the available checkpoints.
ListCheckpoints(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListCheckpointsOptions) ([]Checkpoint, error)
// ListOwnership lists all ownerships.
ListOwnership(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListOwnershipOptions) ([]Ownership, error)
// SetCheckpoint updates a specific checkpoint with a sequence and offset.
SetCheckpoint(ctx context.Context, checkpoint Checkpoint, options *SetCheckpointOptions) error
}
如果您需要灵感,我们附带的实现位于:https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/checkpoints/blob_store.go