在 C# 中使用 BlockingCollection 进行并发消息反序列化的线程安全问题

问题描述 投票:0回答:1

我有以下设置

 internal class Program
 {
     public BlockingCollection<ArraySegment<byte>> MessageQueue { get; set; } = [];

     static void Main(string[] args)
     {
         // Task.Run(Consumer)
         // Callback Func for socket ReceiveAsync calls Producer
     }

     public void Producer(ArraySegment<byte> person)
     {
         MessageQueue.Add(person);
     }

     public void Consumer(CancellationToken token)
     {
         do
         {
             var person = MessageQueue.Take();
             var update = Deserialize(person);

             if (update != null)
             {
                 DoWork(update);
             }

         } while (!token.IsCancellationRequested);
     }

     public static Person? Deserialize(ArraySegment<byte> socketMessage)
     {
         try
         {
             Person? person = JsonSerializer.Deserialize<Person>(socketMessage);

             return person;
         }
         catch (Exception e)
         {
             Console.WriteLine(e);

             return null;
         }
     }
 }

现在解串器抛出:

System.Text.Json.JsonException: '5' is invalid after a single JSON value. Expected end of data. Path: $ | LineNumber: 0 | BytePositionInLine: 656.  ---> System.Text.Json.JsonReaderException: '5' is invalid after a single JSON value. Expected end of data.

如果我查看 Encoding.UTF8.GetString(socketMessage);,错误消息有点无关紧要。结果字符串已损坏,字节被剪切或添加,导致无效的 json。

有趣的是,如果我在 Producer 中移动反序列化,将 BlockingCollection 更改为

BlockingCollection<Person>
,然后 . 获取已经反序列化的对象,其余的
DoWork
工作正常。感觉每次 .Deserialize 字节都会被其他消息的一部分覆盖,但我还没有证据。我所知道的是,目前将解串器转移到生产者是可行的,但并不理想。

System.Text.Json 应该是线程安全的,我无法通过生成随机字节在隔离环境中重现问题。有什么想法吗?

插座:

  var rentedBuffer = ArrayPool<byte>.Shared.Rent(receiveBufferSize);
  var buffer = new ArraySegment<byte>(rentedBuffer, 0, receiveBufferSize);

  result = await handler.ReceiveAsync(buffer, cancellationToken);
  var message = new ArraySegment<byte>(buffer.Array, buffer.Offset, result.Count);
  onMessage.ForEach(onmessage => onmessage(message));
  ArrayPool<byte>.Shared.Return(rentedBuffer);
c# producer-consumer system.text.json blockingcollection
1个回答
0
投票

仅在关联的

Return
完全处理后,才应将数组
ArrayPool<byte>
转移到
ArraySegment<byte>
。否则,就会有多个
ArraySegment<byte>
互相踩到对方的脚趾。

Consumer
方法内部,在
do
循环的末尾:

ArrayPool<byte>.Shared.Return(person.Array);
© www.soinside.com 2019 - 2024. All rights reserved.