将 Azure.Messaging.EventHubs 从 5.5.0 升级到 5.9.0。当我使用“eventData.Body.Array,eventData.Body.Offset”时返回错误

问题描述 投票:0回答:1

将 Azure.Messaging.EventHubs 从 5.5.0 升级到 5.9.0。

我需要将旧的 Microsoft.Azure.EventHubs.EventData 更改为 Azure.Messaging.EventHubs.EventData。

但是当我使用时:

var decompressed = Decompress(eventData.Body.Array, eventData.Body.Offset, eventData.Body.Count);

解压缩功能是:

public static string Decompress(byte[] input, int offset, int count)
    {
        // GZip
        using var memoryStream = new MemoryStream(input, offset, count);
        using var decompressedStream = new GZipStream(memoryStream, CompressionMode.Decompress);
        using var streamReader = new StreamReader(decompressedStream, Encoding.UTF8);

        return streamReader.ReadToEnd();
    }

返回:“ReadOnlyMemory”不包含“Array”的定义,并且找不到接受“ReadOnlyMemory”类型的第一个参数的可访问扩展方法“Array”

我认为新版本EventData.Body已更改,但我不知道新的替代功能。 替换“eventData.Body.Array、eventData.Body.Offset、eventData.Body.Count”的新函数是什么?怎么找到呢?

azure upgrade azure-eventhub
1个回答
0
投票

您可以将事件数据对象包装在数组列表中,然后使用 EventHubProducer 客户端发送数据,如下所示:-

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Producer;
using System;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static async Task Main(string[] args)
        {
            // Replace the <EVENT_HUBS_CONNECTION_STRING> and <EVENT_HUB_NAME> placeholder values
            var connectionString = "Endpoint=sb://eventhub1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKeyxxxxxxx+AEhLwhfnY=";
            var eventHubName = "siliconeventhub";

            // Create a new EventHubProducerClient object
            await using (var producerClient = new EventHubProducerClient(connectionString, eventHubName))
            {
                // Create a batch of events
                using EventDataBatch eventBatch = await producerClient.CreateBatchAsync();

                // Add events to the batch
                for (int i = 1; i <= 3; i++)
                {
                    var eventData = new EventData(Encoding.UTF8.GetBytes($"Event {i}"));
                    eventBatch.TryAdd(eventData);
                }

                // Send the batch of events to the event hub
                await producerClient.SendAsync(eventBatch);
            }
        }
    }
}

输出:-

enter image description here

事件已发送:-

enter image description here

现在使用下面的代码来接收事件并解压它:-

using Azure.Messaging.EventHubs;
using Azure.Messaging.EventHubs.Consumer;
using System;
using System.IO;
using System.IO.Compression;
using System.Text;
using System.Threading.Tasks;

namespace ConsoleApp
{
    class Program
    {
        static async Task Main(string[] args)
        {
            // Replace the <EVENT_HUBS_CONNECTION_STRING> and <EVENT_HUB_NAME> placeholder values
            var connectionString = "Endpoint=sb://eventhub1.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxxxxx+AEhLwhfnY=";
            var eventHubName = "siliconeventhub";

            // Create a new EventHubConsumerClient object
            await using (var consumerClient = new EventHubConsumerClient(EventHubConsumerClient.DefaultConsumerGroupName, connectionString, eventHubName))
            {
                // Receive events from the event hub
                await foreach (PartitionEvent partitionEvent in consumerClient.ReadEventsAsync())
                {
                    // Decompress the body of the event
                    var decompressed = Decompress(BinaryData.FromBytes(partitionEvent.Data.Body));

                    // Print the decompressed body of the event
                    Console.WriteLine($"Received event: {decompressed}");
                }
            }
        }

        public static string Decompress(BinaryData input)
        {
            // Check if the data is GZIP-compressed
            if (IsGZipCompressed(input.ToArray()))
            {
                using var memoryStream = new MemoryStream(input.ToArray());
                using var decompressedStream = new GZipStream(memoryStream, CompressionMode.Decompress);
                using var streamReader = new StreamReader(decompressedStream, Encoding.UTF8);

                return streamReader.ReadToEnd();
            }
            else
            {
                // Treat the data as plain text if it's not GZIP-compressed
                return Encoding.UTF8.GetString(input.ToArray());
            }
        }

        private static bool IsGZipCompressed(byte[] data)
        {
            if (data.Length < 2)
                return false;

            return data[0] == 31 && data[1] == 139;
        }
    }
}

输出:-

enter image description here

© www.soinside.com 2019 - 2024. All rights reserved.