使用 python 订阅 Azure Event Hub 中的多个分区

问题描述 投票:0回答:0

我创建了一个事件中心命名空间。
在事件中心命名空间内,我创建了一个具有 8 个分区的事件中心。
它有一个消费者组——$Default。

我已经用 python 编写了接收器代码,看起来像这样。

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import (
    BlobCheckpointStore,
)

BLOB_STORAGE_CONNECTION_STRING = "BLOB_STORAGE_CONNECTION_STRING"
BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
EVENT_HUB_CONNECTION_STR = "EVENT_HUB_CONNECTION_STR"
EVENT_HUB_NAME = "EVENT_HUB_NAME"


async def on_event(partition_context, event):
    # Print the event data.
    print(
        'Received the event: "{}" from the partition with ID: "{}"'.format(
            event.body_as_str(encoding="UTF-8"), partition_context.partition_id
        )
    )

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    await partition_context.update_checkpoint(event)

async def main():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        BLOB_STORAGE_CONNECTION_STRING, BLOB_CONTAINER_NAME
    )

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string(
        EVENT_HUB_CONNECTION_STR,
        consumer_group="$Default",
        eventhub_name=EVENT_HUB_NAME,
        checkpoint_store=checkpoint_store,
    )
    async with client:
        # Call the receive method. Read from the beginning of the
        # partition (starting_position: "-1")
        await client.receive(on_event=on_event, starting_position="-1")

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    # Run the main method.
    loop.run_until_complete(main())

我从this文档中获取的代码。
现在我已经在 5 个不同的虚拟机中运行了上面的代码。所以期望所有 5 个接收者都应该同时处理 5 个不同的消息。一旦一条消息被处理,空闲的接收者应该消费另一条消息。它应该一直持续到有人停止接收代码。

我在这里面临的问题是,同一条消息被多个接收者接收,并被一次又一次地处理。我的假设是检查点没有正确发生。但我不完全知道为什么它没有发生。
还是上面的代码不符合我的预期

这些是我正在使用的版本:

名称:azure-eventhub 版本:5.10.1

名称:azure-eventhub-checkpointstoreblob-aio 版本:1.1.4

关于此的任何信息都会非常有帮助。非常感谢

python azure events azure-eventhub
© www.soinside.com 2019 - 2024. All rights reserved.