我有这样的BufferBlock设置。
_inputQueue = new BufferBlock<WorkItem>(new DataflowBlockOptions
{
BoundedCapacity = 1,
CancellationToken = cancellationToken,
EnsureOrdered = true
});
让多个消费者从不同的线程调用“FetchWork”函数
public async Task<WorkItem> GetWork()
{
WorkItem wi;
try
{
wi = await _inputQueue.ReceiveAsync(new TimeSpan(0, 0, 1));
}
catch (Exception)
{
//since we supplied a timeout, this will be thrown if no items come back
return null;
}
return wi;
}
偶尔,同一工作项最终会出现在多个消费者身上!在InputQueue中工作项的数量越多,在GetWork中接收重复项的机会就越多。我的理解是通过ReceiveAsync获取的项目是原子的,一旦读取了一个项目,就不会再次读取。这不会发生在这里。我有40个并行消费者呼叫GetWork。
这似乎是服务结构问题。 BufferBlock仅对项目进行一次dequeing。生产者[分区计数为5的服务结构有状态服务实例]在不同分区中接收相同的项目两次。必须调查这个。