我有以下设置
internal class Program
{
public BlockingCollection<ArraySegment<byte>> MessageQueue { get; set; } = [];
static void Main(string[] args)
{
// Task.Run(Consumer)
// Callback Func for socket ReceiveAsync calls Producer
}
public void Producer(ArraySegment<byte> person)
{
MessageQueue.Add(person);
}
public void Consumer(CancellationToken token)
{
do
{
var person = MessageQueue.Take();
var update = Deserialize(person);
if (update != null)
{
DoWork(update);
}
} while (!token.IsCancellationRequested);
}
public static Person? Deserialize(ArraySegment<byte> socketMessage)
{
try
{
Person? person = JsonSerializer.Deserialize<Person>(socketMessage);
return person;
}
catch (Exception e)
{
Console.WriteLine(e);
return null;
}
}
}
现在解串器抛出:
System.Text.Json.JsonException: '5' is invalid after a single JSON value. Expected end of data. Path: $ | LineNumber: 0 | BytePositionInLine: 656. ---> System.Text.Json.JsonReaderException: '5' is invalid after a single JSON value. Expected end of data.
如果我查看 Encoding.UTF8.GetString(socketMessage);,错误消息有点无关紧要。结果字符串已损坏,字节被剪切或添加,导致无效的 json。
有趣的是,如果我在 Producer 中移动反序列化,将 BlockingCollection 更改为
BlockingCollection<Person>
,然后 . 获取已经反序列化的对象,其余的 DoWork
工作正常。感觉每次 .Deserialize 字节都会被其他消息的一部分覆盖,但我还没有证据。我所知道的是,目前将解串器转移到生产者是可行的,但并不理想。
System.Text.Json 应该是线程安全的,我无法通过生成随机字节在隔离环境中重现问题。有什么想法吗?
插座:
var rentedBuffer = ArrayPool<byte>.Shared.Rent(receiveBufferSize);
var buffer = new ArraySegment<byte>(rentedBuffer, 0, receiveBufferSize);
result = await handler.ReceiveAsync(buffer, cancellationToken);
var message = new ArraySegment<byte>(buffer.Array, buffer.Offset, result.Count);
onMessage.ForEach(onmessage => onmessage(message));
ArrayPool<byte>.Shared.Return(rentedBuffer);
仅在关联的
Return
完全处理后,才应将数组 ArrayPool<byte>
转移到 ArraySegment<byte>
。否则,就会有多个 ArraySegment<byte>
互相踩到对方的脚趾。
在
Consumer
方法内部,在 do
循环的末尾:
ArrayPool<byte>.Shared.Return(person.Array);