我创建了一个事件中心命名空间。
在事件中心命名空间内,我创建了一个具有 8 个分区的事件中心。
它有一个消费者组——$Default。
我已经用 python 编写了接收器代码,看起来像这样。
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import (
BlobCheckpointStore,
)
BLOB_STORAGE_CONNECTION_STRING = "BLOB_STORAGE_CONNECTION_STRING"
BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
EVENT_HUB_CONNECTION_STR = "EVENT_HUB_CONNECTION_STR"
EVENT_HUB_NAME = "EVENT_HUB_NAME"
async def on_event(partition_context, event):
# Print the event data.
print(
'Received the event: "{}" from the partition with ID: "{}"'.format(
event.body_as_str(encoding="UTF-8"), partition_context.partition_id
)
)
# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
await partition_context.update_checkpoint(event)
async def main():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore.from_connection_string(
BLOB_STORAGE_CONNECTION_STRING, BLOB_CONTAINER_NAME
)
# Create a consumer client for the event hub.
client = EventHubConsumerClient.from_connection_string(
EVENT_HUB_CONNECTION_STR,
consumer_group="$Default",
eventhub_name=EVENT_HUB_NAME,
checkpoint_store=checkpoint_store,
)
async with client:
# Call the receive method. Read from the beginning of the
# partition (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")
if __name__ == "__main__":
loop = asyncio.get_event_loop()
# Run the main method.
loop.run_until_complete(main())
我从this文档中获取的代码。
现在我已经在 5 个不同的虚拟机中运行了上面的代码。所以期望所有 5 个接收者都应该同时处理 5 个不同的消息。一旦一条消息被处理,空闲的接收者应该消费另一条消息。它应该一直持续到有人停止接收代码。
我在这里面临的问题是,同一条消息被多个接收者接收,并被一次又一次地处理。我的假设是检查点没有正确发生。但我不完全知道为什么它没有发生。
还是上面的代码不符合我的预期
这些是我正在使用的版本:
名称:azure-eventhub 版本:5.10.1
名称:azure-eventhub-checkpointstoreblob-aio 版本:1.1.4
关于此的任何信息都会非常有帮助。非常感谢