据我了解,读取事件的正确方法是使用
EventProcessorClient
,但我在理解其背后的概念时遇到一些困难。
下面的代码获取一些事件,然后停止,并且似乎永远不会获取任何新事件。
我真正想要的是只获取新事件。
IotHub 只是另一个类,用于存储获取的事件并使它们可供应用程序 UI 使用。
using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Processor;
using Azure.Storage.Blobs;
using Microsoft.Azure.Amqp.Framing;
using System;
using System.Collections;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Collections.ObjectModel;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace StationManager
{
internal class EventHub
{
static string storageConnectionString = "DefaultEndpointsProtocol=https;AccountName=xxxxxxxxx;AccountKey=kkkkkkkk;EndpointSuffix=core.windows.net";
static string blobContainerName = "blobcontainer";
static string eventHubsConnectionString = "Endpoint=sb://xxxxyyyyy.servicebus.windows.net/;SharedAccessKeyName=iothubowner;SharedAccessKey=kkkkkkkkk;EntityPath=iothub-hhhhhhhhhh";
static string ddd = "HostName=Hub.azure-devices.net;SharedAccessKeyName=iothubowner;SharedAccessKey=ffffff";
static string eventHubName = "iothub-hhhhhhhhhh";
static string consumerGroup = "$Default";
BlobContainerClient storageClient;
EventProcessorClient processor;
ConcurrentDictionary<string, int> partitionEventCount = new ConcurrentDictionary<string, int>();
CancellationTokenSource cancellationSource;
IoTHub iotHub;
public async void Stop()
{
cancellationSource.Cancel();
}
public async void Start(IoTHub iothub)
{
iotHub = iothub;
storageClient = new BlobContainerClient(
storageConnectionString,
blobContainerName);
processor = new EventProcessorClient(
storageClient,
consumerGroup,
eventHubsConnectionString,
eventHubName);
try
{
cancellationSource = new CancellationTokenSource();
//cancellationSource.CancelAfter(TimeSpan.FromSeconds(30));
processor.ProcessEventAsync += processEventHandler;
processor.ProcessErrorAsync += processErrorHandler;
try
{
iotHub?.DidReceiveTelemetry("Started receiving.");
await processor.StartProcessingAsync(cancellationSource.Token);
await Task.Delay(Timeout.Infinite, cancellationSource.Token);
}
catch (TaskCanceledException te)
{
// This is expected if the cancellation token is
// signaled.
iotHub?.DidReceiveTelemetry($"Cancelled.");
}
finally
{
// This may take up to the length of time defined
// as part of the configured TryTimeout of the processor;
// by default, this is 60 seconds.
await processor.StopProcessingAsync();
}
}
catch
{
// The processor will automatically attempt to recover from any
// failures, either transient or fatal, and continue processing.
// Errors in the processor's operation will be surfaced through
// its error handler.
//
// If this block is invoked, then something external to the
// processor was the source of the exception.
}
finally
{
// It is encouraged that you unregister your handlers when you have
// finished using the Event Processor to ensure proper cleanup. This
// is especially important when using lambda expressions or handlers
// in any form that may contain closure scopes or hold other references.
processor.ProcessEventAsync -= processEventHandler;
processor.ProcessErrorAsync -= processErrorHandler;
iotHub?.DidReceiveTelemetry("Stopped receiving.");
}
}
async Task processEventHandler(ProcessEventArgs args)
{
try
{
// If the cancellation token is signaled, then the
// processor has been asked to stop. It will invoke
// this handler with any events that were in flight;
// these will not be lost if not processed.
//
// It is up to the handler to decide whether to take
// action to process the event or to cancel immediately.
if (args.CancellationToken.IsCancellationRequested)
{
return;
}
string partition = args.Partition.PartitionId;
string deviceId = (string)args.Data.SystemProperties["iothub-connection-device-id"];
DateTime timestamp = (DateTime)args.Data.SystemProperties["iothub-enqueuedtime"];
byte[] eventBody = args.Data.EventBody.ToArray();
var eventProperties = args.Data.Properties;
//Debug.WriteLine($"Event from partition {partition} with length {eventBody.Length}.");
string data = Encoding.UTF8.GetString(eventBody);
string evt = $"{partition}: {deviceId}, {timestamp}\n{data}";
iotHub?.DidReceiveTelemetry(evt);
int eventsSinceLastCheckpoint = partitionEventCount.AddOrUpdate(
key: partition,
addValue: 1,
updateValueFactory: (_, currentCount) => currentCount + 1);
if (eventsSinceLastCheckpoint >= 50)
{
await args.UpdateCheckpointAsync();
partitionEventCount[partition] = 0;
}
}
catch
{
// It is very important that you always guard against
// exceptions in your handler code; the processor does
// not have enough understanding of your code to
// determine the correct action to take. Any
// exceptions from your handlers go uncaught by
// the processor and will NOT be redirected to
// the error handler.
}
}
Task processErrorHandler(ProcessErrorEventArgs args)
{
try
{
Debug.WriteLine("Error in the EventProcessorClient");
Debug.WriteLine($"\tOperation: {args.Operation}");
Debug.WriteLine($"\tException: {args.Exception}");
Debug.WriteLine("");
}
catch
{
// It is very important that you always guard against
// exceptions in your handler code; the processor does
// not have enough understanding of your code to
// determine the correct action to take. Any
// exceptions from your handlers go uncaught by
// the processor and will NOT be handled in any
// way.
}
return Task.CompletedTask;
}
}
}
当您需要检查点时,
EventProcessorClient
非常有用。检查点存储您上次读取事件的时间。在您的场景中,您似乎并不关心检查点,而是只关心最后的事件。如果是这样,您可以使用 EventHubConsumerClient
代替。 ReadEventsAsync
有一个带有布尔参数的重载。当设置为 false 时,您只会收到新消息。
await using var consumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, "REDACTED");
// consumerClient.ReadEventsAsync(false) means it starts reading at the end
await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync(false))
{
string readData = System.Text.Encoding.UTF8.GetString(partitionEvent.Data.Body.ToArray());
Console.WriteLine($"Read event of length {readData.Length} from partition {partitionEvent.Partition.PartitionId}");
Console.WriteLine($"Message: {readData}");
}
请注意,还有一种方法可以设置
EventProcessorClient
的开始时间,在这个答案中进行了描述。 但是,这仅在没有现有检查点的情况下才有效。否则优先。