根据文档,我们可以通过收集和分析各种类型的日志来监控Azure IoT Hub,包括平台指标、资源日志、连接、设备遥测、C2D命令、活动日志等
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;
using Microsoft.Azure.EventHubs;
namespace FunctionApp
{
public static class TimerFunction
{
private static EventHubClient? eventHubClient; // Nullable eventHubClient
private static readonly string connectionString = "EventHubconnectionString";
private static readonly string eventHubName = "eventHubName";
[Function("TimerFunction")]
public static async Task RunAsync([TimerTrigger("0 */1 * * * *")] TimerInfo myTimer, FunctionContext context)
{
var logger = context.GetLogger("TimerFunction");
logger.LogInformation($"C# Timer trigger function executed at: {DateTime.Now}");
// Build the connection string with Event Hub name
var connectionStringBuilder = new EventHubsConnectionStringBuilder(connectionString)
{
EntityPath = eventHubName
};
string eventHubConnectionString = connectionStringBuilder.ToString();
// Create Event Hub client
eventHubClient = EventHubClient.CreateFromConnectionString(eventHubConnectionString);
var runtimeInformation = await eventHubClient.GetRuntimeInformationAsync();
var d2cPartitions = runtimeInformation.PartitionIds;
CancellationTokenSource cts = new CancellationTokenSource();
var tasks = new List<Task>();
foreach (string partition in d2cPartitions)
{
tasks.Add(ReceiveMessagesFromDeviceAsync(partition, cts.Token, logger));
}
await Task.Delay(60000); // Ensure the function runs for 1 minute
logger.LogInformation("Exiting...");
cts.Cancel();
await Task.WhenAll(tasks);
await eventHubClient.CloseAsync();
}
private static async Task ReceiveMessagesFromDeviceAsync(string partition, CancellationToken ct, ILogger logger)
{
var eventHubReceiver = eventHubClient!.CreateReceiver("$Default", partition, EventPosition.FromEnqueuedTime(DateTime.Now));
while (!ct.IsCancellationRequested)
{
try
{
var eventDataBatch = await eventHubReceiver.ReceiveAsync(100); // Maximum number of events to receive
if (eventDataBatch != null)
{
foreach (var eventData in eventDataBatch)
{
string data = Encoding.UTF8.GetString(eventData.Body.Array);
logger.LogInformation($"Message received. Partition: {partition} Data: '{data}'");
var message = Newtonsoft.Json.JsonConvert.DeserializeObject<AzureMonitorDiagnosticLog>(data);
// Process the message...
}
}
}
catch (Exception ex)
{
logger.LogError($"Error receiving message from partition {partition}: {ex.Message}");
}
}
await eventHubReceiver.CloseAsync();
}
}
class AzureMonitorDiagnosticLog
{
public string? time { get; set; }
public string? resourceId { get; set; }
public string? operationName { get; set; }
public string? category { get; set; }
public string? level { get; set; }
public string? resultType { get; set; }
public string? resultDescription { get; set; }
public string? durationMs { get; set; }
public string? callerIpAddress { get; set; }
public string? correlationId { get; set; }
public string? identity { get; set; }
public string? location { get; set; }
public Dictionary<string, string>? properties { get; set; }
}
}