有一段时间没有任何消息可以关闭Eventhubconsumerclient.receive吗?

问题描述 投票:0回答:1

我刚刚在版本5中实现了eventhub,该版本与以前的版本有所不同。

当前正在运行的代码如下:

consumer_client = EventHubConsumerClient.from_connection_string(conn_str=CONNECTION_STR,
                                                  consumer_group='fconsumer',
                                                  eventhub_name=EVENTHUB_NAME)
consumer_client.receive(on_event=on_event, 
                        partition_id = "0", 
                        track_last_enqueued_event_properties=False, 
                        starting_position="@latest")

通过添加持续时间的参数(或上一版本中的keep_alive ag),我将使其停止接收消息,并在一定时间后关闭它。这可能吗?

azure-eventhub
1个回答
0
投票

consumer_client.receive(...)将成为阻塞调用,它不会自行返回。您需要创建一个用于使用事件的线程,并且可以在主线程中决定何时关闭使用方客户端。示例代码段如下...

thread = threading.Thread(
    target=consumer_client.receive,
    kwargs={
        "on_event": on_event,
        "on_partition_initialize": on_partition_initialize,
        "on_partition_close": on_partition_close,
        "on_error": on_error,
        "starting_position": "-1",  # "-1" is from the beginning of the partition.
    }
)

thread.daemon = True
thread.start()
time.sleep(RECEIVE_DURATION)
consumer_client.close()
thread.join()
© www.soinside.com 2019 - 2024. All rights reserved.