我正在尝试创建一个与 Azure IoT 浏览器设备遥测页面基本相同的应用程序。
我想实时读取设备发送的设备到云消息。
这是我当前使用的代码,它不会返回任何消息,我所看到的是为分区 0 和 1 启动了两个线程。
连接字符串是直接从 Azure IoT 中心网页复制的。
XXX 班{
EventHubConsumerClient eventHubConsumerClient;
private const string ConsumerGroup = "$Default";
private async Task Setup()
{
eventHubConsumerClient = new EventHubConsumerClient(ConsumerGroup, eventHubConnectionString);
cancellationSource = new CancellationTokenSource();
var tasks = new List<Task>();
var partitions = await eventHubConsumerClient.GetPartitionIdsAsync();
foreach (string partition in partitions)
{
tasks.Add(ReceiveMessagesFromDeviceAsync(partition, cancellationSource));
}
}
async Task ReceiveMessagesFromDeviceAsync(string partitionId, CancellationTokenSource cs)
{
Telemetry.Add($"Starting listener thread for partition: {partitionId}");
try
{
while (IsTelemetryStarted)
{
await foreach (PartitionEvent receivedEvent in eventHubConsumerClient.ReadEventsFromPartitionAsync(partitionId, EventPosition.Latest, cs.Token))
{
string msgSource;
string body = Encoding.UTF8.GetString(receivedEvent.Data.Body.ToArray());
if (receivedEvent.Data.SystemProperties.ContainsKey("iothub-message-source"))
{
msgSource = receivedEvent.Data.SystemProperties["iothub-message-source"].ToString();
Debug.WriteLine($"{partitionId} {msgSource} {body}");
Telemetry.Add($"{partitionId} {msgSource} {body}");
}
}
}
} catch( Exception ex )
{
}
Telemetry.Add($"Stopped listener thread for partition: {partitionId}.");
}
}
您的代码示例不完整,因此无法判断发生了什么。该类不完整,没有可调用的方法。什么是
Telemetry
? Setup()
方法创建一些永远不会等待的任务,它也不会返回任务列表。
这可行,也许从这里开始并从那里开始。
await using var consumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, "redacted");
await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync())
{
string readData = System.Text.Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray());
Console.WriteLine($"Read event of length {readData.Length} from partition {partitionEvent.Partition.PartitionId}");
Console.WriteLine($"Message: {readData}");
}