生产者/消费者问题:面临死锁

问题描述 投票:0回答:1

我恰好使用了几个线程来生成和使用缓冲区内的数据。 只有一个线程用于生成数据,也只有一个线程用于消费。

但是,在消费者线程等待生产者完成处理时,我仍然在大多数时间面临死锁。另一方面,制作人已经完成工作就退出了。

我进行了测试并采用了几个字节作为文件的输入,该文件应由数据使用者线程生成为数据。

以下是源代码。

public class FileGenerator
{
    private static readonly string _fileLocation = Path.Combine("Resources", "files");

    private static Faker<Employee> _employeeGenerator = null!;

    private static Random _random = new Random(1);

    private long _maxBufferSize = 28;
    private long _maxFileSizeInBytes = 0;
    private long _totalBytesWritten = 0;

    private Queue<byte[]> _buffer = null;
    private bool _continueProducingData = false;
    static object bufferLock = new object();

    public FileGenerator()
    {
        _buffer = new Queue<byte[]>();
        _continueProducingData = true;

        _employeeGenerator = new Faker<Employee>()
            .RuleFor(e => e.RandomNumberDelimator, 100)
            .RuleFor(e => e.AboutMe, f => f.Lorem.Paragraph(1));
    }

public void Execute () {

 _maxFileSizeInBytes = fileSize * Convert.ToInt64(100);
        var filename = $"unsorted.{fileSize}.txt";
        var path = string.IsNullOrWhiteSpace(location) ? Path.Combine(_fileLocation, filename) : Path.Combine(location, filename);            

        var producerTask = Task.Run(() => GenerateTestDataSetAsyncInternal());
        var consumerTask = Task.Run(() => ConsumeTestDataSetAsync(new StreamWriter(File.Open(path,FileMode.Create), bufferSize : 65535)));

        await Task.WhenAll(producerTask, consumerTask);

}

private void GenerateTestDataSetAsyncInternal()
{
        while (_totalBytesWritten <= _maxFileSizeInBytes)
        {
            _continueProducingData = true;

            lock (bufferLock)
            {
                if (_buffer.Count == 0)
                {
                    long noOfbytesWrittenPerDataBytes = 0;

                    while (noOfbytesWrittenPerDataBytes <= _maxBufferSize)
                    {
                        var dataBytes = Encoding.UTF8.GetBytes(_employeeGenerator.Generate().ToString());
                        noOfbytesWrittenPerDataBytes += dataBytes.Length;
                        _buffer.Enqueue(dataBytes);
                    }

                    _totalBytesWritten += noOfbytesWrittenPerDataBytes;

                    Monitor.Pulse(bufferLock);
                }
                else
                {
                    Monitor.Wait(bufferLock);
                }
            }
        }

        Console.WriteLine("Producer finished...");

        _continueProducingData = false;
    }

    private void ConsumeTestDataSetAsync(StreamWriter writer)
    {
        while (_continueProducingData)
        {
            lock (bufferLock)
            {
                if (_buffer.Count == 0)
                {
                    Monitor.Wait(bufferLock); --> Here it waits and thinks that the producer is producing and will eventually call Monitor.Pulse, while the producer already finished off.
                    continue;
                }

                while (_buffer.Count != 0)
                {
                    var dataBytes = _buffer.Dequeue();
                    writer.BaseStream.Write(dataBytes, 0, dataBytes.Length);
                    writer.Flush();                            
                }

                Monitor.PulseAll(bufferLock);                    
            }
        }

        Console.WriteLine("Consumer finished...");
    }
}

我做错了什么吗?仅考虑 2 个线程(一个用于写入,另一个用于读取)来实现 Procuer/Consumer 是正确的方法吗?

c# multithreading task-parallel-library producer-consumer
1个回答
0
投票

以下是使用 Ada 编写的简单生产者-消费者示例。此示例演示了一种协调生产者和消费者终止而不会出现死锁的解决方案。

-- single producer / single consumer with coordinated termination

with Ada.Text_IO; use Ada.Text_IO;

procedure Main is

   -- The shared buffer interface specification
   type idx_t is mod 10;
   type bounded_buffer is array (idx_t) of Integer;
   protected buffer is
      entry write (value : in Integer);
      entry read (value : out Integer);
      procedure proc_set_done;
      function is_empty return Boolean;
      function proc_complete return Boolean;
   private
      Buff        : bounded_buffer;
      Write_Index : idx_t   := idx_t'First;
      Read_Index  : idx_t   := idx_t'First;
      Count       : Natural := 0;
      Complete    : Boolean := False;
   end buffer;

   -- Define the behavior of bounded_buffer
   protected body buffer is
      entry write (value : in Integer) when Count < Buff'Length is
      begin
         Buff (Write_Index) := value;
         Write_Index        := Write_Index + 1;
         Count              := Count + 1;
      end write;

      entry read (value : out Integer) when Count > 0 is
      begin
         value      := Buff (Read_Index);
         Read_Index := Read_Index + 1;
         Count      := Count - 1;
      end read;

      procedure proc_set_done is
      begin
         Complete := True;
      end proc_set_done;

      function is_empty return Boolean is
      begin
         return Count = 0;
      end is_empty;

      function proc_complete return Boolean is
      begin
         return Complete;
      end proc_complete;
   end buffer;

   -- define producer task API (this example has no callable interface)
   task producer;

   task body producer is
   begin
      for I in 1 .. 30 loop
         buffer.write (I);
      end loop;
      buffer.proc_set_done;
      Put_Line("Producer done.");
   end producer;

   Next_Val : Integer;
begin

   -- The main procedure acts as the consumer in this example
   loop
      exit when buffer.proc_complete and then buffer.is_empty;
      buffer.read (Next_Val);
      Put_Line ("Consumer read" & Next_Val'Image);
      delay 0.001; -- slow down the consumer
   end loop;
   Put_Line ("Consumer done.");
end Main;

Ada 受保护对象不受竞争条件影响。受保护对象是任务调用的并发被动对象(类似于C++线程)。受保护对象的方法隐式操作其共享对象上的锁。

Ada 强制将规范 (API) 与实现明确分离。保护对象规范为:

   protected buffer is
      entry write (value : in Integer);
      entry read (value : out Integer);
      procedure proc_set_done;
      function is_empty return Boolean;
      function proc_complete return Boolean;
   private
      Buff        : bounded_buffer;
      Write_Index : idx_t   := idx_t'First;
      Read_Index  : idx_t   := idx_t'First;
      Count       : Natural := 0;
      Complete    : Boolean := False;
   end buffer;

这个受保护对象有五个方法。每个受保护的条目都有一个关联的条件变量。受保护的过程没有关联的条件变量。受保护的条目和受保护的过程都在受保护的对象上实现了独占读写锁。受保护函数是在受保护对象上实现共享只读锁的只读方法。

受保护对象规范的私有部分声明了受保护对象的所有数据成员。这些数据成员只能通过受保护对象规范中声明的条目、过程和函数来调用任务。

受保护方法的实现包含在受保护主体中。

-- Define the behavior of bounded_buffer
   protected body buffer is
      entry write (value : in Integer) when Count < Buff'Length is
      begin
         Buff (Write_Index) := value;
         Write_Index        := Write_Index + 1;
         Count              := Count + 1;
      end write;

      entry read (value : out Integer) when Count > 0 is
      begin
         value      := Buff (Read_Index);
         Read_Index := Read_Index + 1;
         Count      := Count - 1;
      end read;

      procedure proc_set_done is
      begin
         Complete := True;
      end proc_set_done;

      function is_empty return Boolean is
      begin
         return Count = 0;
      end is_empty;

      function proc_complete return Boolean is
      begin
         return Complete;
      end proc_complete;
   end buffer;

为bounded_buffer数组类型选择的索引类型是模块化类型。 Ada 模块化类型展示了模块化算术,并且可以很好地用作循环缓冲区的索引。在这种情况下,模块化类型是:

type idx_t is mod 10;

此类型实例的有效值为 0 到 9(包含 0 和 9)。因此,由 idx_t 索引的数组类型的起始索引值为 0,结束索引值为 9。将 1 与 9 相加得到 0。因此,读取和写入条目中的逻辑只是简单地递增相应的 write_index 或 read_index操作。

与写入条目相关的条件是“当计数 < Buff'Length". Buff'Length evaluates to the number of elements in the Buff array.

生产者任务规范很简单。此示例没有与生产者任务的直接接口。生产者任务只是写入缓冲区。

生产者任务的实现只是将值 1 到 30 写入缓冲区,调用 buffer.proc_set_done,并输出一条消息,表明生产者已完成。

主过程在该程序的根任务(或线程)中运行,并用作消费者。消费者只是迭代地循环读取缓冲区,直到 buffer.proc_complete 为 TRUE 并且 buffer.is_empty 为 TRUE。消费者中使用延迟语句来减慢速度,以便输出清楚地显示生产者在消费者之前完成,而不会导致消费者死锁。

该程序的输出是:

Consumer read 1
Consumer read 2
Consumer read 3
Consumer read 4
Consumer read 5
Consumer read 6
Consumer read 7
Consumer read 8
Consumer read 9
Consumer read 10
Consumer read 11
Consumer read 12
Consumer read 13
Consumer read 14
Consumer read 15
Consumer read 16
Consumer read 17
Consumer read 18
Consumer read 19
Consumer read 20
Producer done.
Consumer read 21
Consumer read 22
Consumer read 23
Consumer read 24
Consumer read 25
Consumer read 26
Consumer read 27
Consumer read 28
Consumer read 29
Consumer read 30
Consumer done.
© www.soinside.com 2019 - 2024. All rights reserved.