多个容器上的 Azure 事件中心检查点

问题描述 投票:0回答:1

目前有没有办法订阅整个存储帐户(多容器)?

我想将我的事件中心使用者检查点到多个容器或整个存储帐户,我的用例是我正在与多个容器交互,并且事件是通过它们分派的,目前,我已经阅读了文档和示例只能订阅一个容器。 这是我当前的实现,其中侦听一个容器,当元数据更改时,消费者会选择事件。

sharedCredential, errAzCred := container.NewSharedKeyCredential(ae.AccountName, ae.AccountKey)
    if errAzCred != nil {
        log.WithError(errAzCred).Error("Creating sharedCredential failed")
        return errAzCred
    }
    checkClient, errAzClient := container.NewClientWithSharedKeyCredential(ae.ContanerUrl, sharedCredential, nil)
    if errAzClient != nil {
        log.WithError(errAzClient).Error("Creating container client failed")
        return errAzClient
    }

    checkpointStore, errCheckPt := checkpoints.NewBlobStore(checkClient, nil)
    if errCheckPt != nil {
        log.WithError(errCheckPt).Error("Creating checkpoint store failed")
        return errCheckPt
    }
    consumerClient, errClient := azeventhubs.NewConsumerClientFromConnectionString(ae.HubNamespaceConnectionString, ae.EventHubName, azeventhubs.DefaultConsumerGroup, nil)
    if errClient != nil {
        log.WithError(errClient).Error("Creating event hub consumer failed")
        return errClient
    }
    defer consumerClient.Close(context.TODO())
    processor, errProcessor := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)
    if errProcessor != nil {
        log.WithError(errProcessor).Error("Creating event hub processor failed")
        return errProcessor
    }

我尝试了以下方法,它有效,但是当文件的元数据更改时,消费者不会选择该事件。

properties, err := consumerClient.GetEventHubProperties(context.TODO(), nil)
    if err != nil {
        return err
    }
    var wg sync.WaitGroup
    for _, partitionID := range properties.PartitionIDs {
        log.Debugf("start consuming partition %s", partitionID)
        partitionClient, errPC := consumerClient.NewPartitionClient(partitionID, nil)
        if errPC != nil {
            log.WithError(errPC).Error("couldn't create a partition client")
            return errPC
        }
        wg.Add(1)
        go func() {
            defer wg.Done()
            if errProcess := ae.ProcessEvents(partitionClient); errProcess != nil {
                log.WithError(errProcess).Error("failed to process the event")
            }
        }()
    }

谢谢你

azure go azure-eventhub
1个回答
0
投票

对于我们在 SDK 中提供的内容,简单的答案是“不”。

但是,如果您想构建自己的检查点商店,我们确实提供了自定义点。界面非常简单:

type CheckpointStore interface {
    // ClaimOwnership attempts to claim ownership of the partitions in partitionOwnership and returns
    // the actual partitions that were claimed.
    ClaimOwnership(ctx context.Context, partitionOwnership []Ownership, options *ClaimOwnershipOptions) ([]Ownership, error)

    // ListCheckpoints lists all the available checkpoints.
    ListCheckpoints(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListCheckpointsOptions) ([]Checkpoint, error)

    // ListOwnership lists all ownerships.
    ListOwnership(ctx context.Context, fullyQualifiedNamespace string, eventHubName string, consumerGroup string, options *ListOwnershipOptions) ([]Ownership, error)

    // SetCheckpoint updates a specific checkpoint with a sequence and offset.
    SetCheckpoint(ctx context.Context, checkpoint Checkpoint, options *SetCheckpointOptions) error
}

如果您需要灵感,我们附带的实现位于:https://github.com/Azure/azure-sdk-for-go/blob/main/sdk/messaging/azeventhubs/checkpoints/blob_store.go

© www.soinside.com 2019 - 2024. All rights reserved.