我刚刚在版本5中实现了eventhub,该版本与以前的版本有所不同。
当前正在运行的代码如下:
consumer_client = EventHubConsumerClient.from_connection_string(conn_str=CONNECTION_STR,
consumer_group='fconsumer',
eventhub_name=EVENTHUB_NAME)
consumer_client.receive(on_event=on_event,
partition_id = "0",
track_last_enqueued_event_properties=False,
starting_position="@latest")
通过添加持续时间的参数(或上一版本中的keep_alive ag),我将使其停止接收消息,并在一定时间后关闭它。这可能吗?
consumer_client.receive(...)将成为阻塞调用,它不会自行返回。您需要创建一个用于使用事件的线程,并且可以在主线程中决定何时关闭使用方客户端。示例代码段如下...
thread = threading.Thread(
target=consumer_client.receive,
kwargs={
"on_event": on_event,
"on_partition_initialize": on_partition_initialize,
"on_partition_close": on_partition_close,
"on_error": on_error,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
thread.daemon = True
thread.start()
time.sleep(RECEIVE_DURATION)
consumer_client.close()
thread.join()