我正在尝试从 IoTHub 设备读取实时事件 - 与选择设备时 Azure IotHub Explorer 执行的方式相同

问题描述 投票:0回答:1

据我了解,读取事件的正确方法是使用

EventProcessorClient
,但我在理解其背后的概念时遇到一些困难。

下面的代码获取一些事件,然后停止,并且似乎永远不会获取任何新事件。

我真正想要的是只获取新事件。

IotHub 只是另一个类,用于存储获取的事件并使它们可供应用程序 UI 使用。

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using Microsoft.Azure.Amqp.Framing;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace StationManager
{
    internal class EventHub
    {

        static string storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=xxxxxxxxx;AccountKey=kkkkkkkk;EndpointSuffix=core.windows.net";
        static string blobContainerName = "blobcontainer";

        static string eventHubsConnectionString = "Endpoint=sb://xxxxyyyyy.servicebus.windows.net/;SharedAccessKeyName=iothubowner;SharedAccessKey=kkkkkkkkk;EntityPath=iothub-hhhhhhhhhh";
        static string ddd = "HostName=Hub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=ffffff";
        static string eventHubName = "iothub-hhhhhhhhhh";
        static string consumerGroup = "$Default";

        BlobContainerClient storageClient;

        EventProcessorClient processor;

        ConcurrentDictionary<string, int> partitionEventCount = new ConcurrentDictionary<string, int>();

        CancellationTokenSource cancellationSource;
        IoTHub iotHub;

        public async void Stop()
        {
            cancellationSource.Cancel();
        }
        public async void Start(IoTHub iothub)
        {
            iotHub = iothub;

             storageClient = new BlobContainerClient(
                storageConnectionString,
                 blobContainerName);

             processor = new EventProcessorClient(
                storageClient,
                consumerGroup,
                eventHubsConnectionString,
                eventHubName);

            try
            {
                cancellationSource = new CancellationTokenSource();
                //cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));

                processor.ProcessEventAsync += processEventHandler;
                processor.ProcessErrorAsync += processErrorHandler;

                try
                {
                    iotHub?.DidReceiveTelemetry("Started receiving.");
                    await processor.StartProcessingAsync(cancellationSource.Token);
                    await Task.Delay(Timeout.Infinite, cancellationSource.Token);
                }
                catch (TaskCanceledException te)
                {
                    // This is expected if the cancellation token is
                    // signaled.
                    iotHub?.DidReceiveTelemetry($"Cancelled.");
                }
                finally
                {
                    // This may take up to the length of time defined
                    // as part of the configured TryTimeout of the processor;
                    // by default, this is 60 seconds.

                    await processor.StopProcessingAsync();
                }
            }
            catch
            {
                // The processor will automatically attempt to recover from any
                // failures, either transient or fatal, and continue processing.
                // Errors in the processor's operation will be surfaced through
                // its error handler.
                //
                // If this block is invoked, then something external to the
                // processor was the source of the exception.
            }
            finally
            {
                // It is encouraged that you unregister your handlers when you have
                // finished using the Event Processor to ensure proper cleanup.  This
                // is especially important when using lambda expressions or handlers
                // in any form that may contain closure scopes or hold other references.

                processor.ProcessEventAsync -= processEventHandler;
                processor.ProcessErrorAsync -= processErrorHandler;

                iotHub?.DidReceiveTelemetry("Stopped receiving.");
            }

        }
        async Task processEventHandler(ProcessEventArgs args)
        {
            try
            {
                // If the cancellation token is signaled, then the
                // processor has been asked to stop.  It will invoke
                // this handler with any events that were in flight;
                // these will not be lost if not processed.
                //
                // It is up to the handler to decide whether to take
                // action to process the event or to cancel immediately.

                if (args.CancellationToken.IsCancellationRequested)
                {
                    return;
                }

                string partition = args.Partition.PartitionId;
                string deviceId = (string)args.Data.SystemProperties["iothub-connection-device-id"];
                DateTime timestamp = (DateTime)args.Data.SystemProperties["iothub-enqueuedtime"];
                byte[] eventBody = args.Data.EventBody.ToArray();
                var eventProperties = args.Data.Properties;
                //Debug.WriteLine($"Event from partition {partition} with length {eventBody.Length}.");

                string data = Encoding.UTF8.GetString(eventBody);
                string evt = $"{partition}: {deviceId}, {timestamp}\n{data}";
                iotHub?.DidReceiveTelemetry(evt);

                int eventsSinceLastCheckpoint = partitionEventCount.AddOrUpdate(
                    key: partition,
                    addValue: 1,
                    updateValueFactory: (_, currentCount) => currentCount + 1);

                if (eventsSinceLastCheckpoint >= 50)
                {
                    await args.UpdateCheckpointAsync();
                    partitionEventCount[partition] = 0;
                }
            }
            catch
            {
                // It is very important that you always guard against
                // exceptions in your handler code; the processor does
                // not have enough understanding of your code to
                // determine the correct action to take.  Any
                // exceptions from your handlers go uncaught by
                // the processor and will NOT be redirected to
                // the error handler.
            }
        }

        Task processErrorHandler(ProcessErrorEventArgs args)
        {
            try
            {
                Debug.WriteLine("Error in the EventProcessorClient");
                Debug.WriteLine($"\tOperation: {args.Operation}");
                Debug.WriteLine($"\tException: {args.Exception}");
                Debug.WriteLine("");
            }
            catch
            {
                // It is very important that you always guard against
                // exceptions in your handler code; the processor does
                // not have enough understanding of your code to
                // determine the correct action to take.  Any
                // exceptions from your handlers go uncaught by
                // the processor and will NOT be handled in any
                // way.
            }

            return Task.CompletedTask;
        }

      
    }
}
c# azure azure-iot-hub eventprocessorclient
1个回答
0
投票

当您需要检查点时,

EventProcessorClient
非常有用。检查点存储您上次读取事件的时间。在您的场景中,您似乎并不关心检查点,而是只关心最后的事件。如果是这样,您可以使用
EventHubConsumerClient
代替。
ReadEventsAsync
有一个带有布尔参数的重载。当设置为 false 时,您只会收到新消息。

await using var consumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, "REDACTED");

// consumerClient.ReadEventsAsync(false) means it starts reading at the end
await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync(false))
{
    string readData = System.Text.Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray());
    Console.WriteLine($"Read event of length {readData.Length} from partition {partitionEvent.Partition.PartitionId}");
    Console.WriteLine($"Message: {readData}");
}

请注意,还有一种方法可以设置

EventProcessorClient
的开始时间,在这个答案中进行了描述。 但是,这仅在没有现有检查点的情况下才有效。否则优先。

© www.soinside.com 2019 - 2024. All rights reserved.