生产者-消费者同步问题

问题描述 投票:2回答:1

我正在尝试编写一个简单的生产者-消费者应用程序,在该应用程序中,我需要从文件中读取大块数据(可能很大),并且(出于简单的测试目的)只需通过另一个线程将它们写入另一个文件。

我已经尝试了许多在线资源,但是这些线程同步任务对我来说很难理解,我发现的每个示例对我来说都缺少一些重要方面。

[我把ALMOST似乎起作用的代码片段放在一起,但是与线程相关的某些事情显然是错误的,因此我想问问您是否有人可以发现我在做什么错。如果我在下面的程序中运行某些测试文件,程序会完成OK(至少对我和我的测试文件而言),但如果我取消Thread.Sleep(20)方法中的dequeueObjectAndWriteItToFile注释(为了测试当生产者快于消费者时,会发生什么?然后(基于控制台中打印的数据),生产者将maxQueueSize + 1数据块插入队列中,并且程序进入某个无限循环或某些循环

我怀疑_producerThreadWaitEventHandler.Set()调用可能是问题的一部分,因为目前它在dequeueObjectAndWriteItToFile中为每个while循环调用(我只想在必要时调用它,即是否已调用_producerThreadWaitEventHandler.waitOne()并唤醒该线程,但是我不知道如何找出某个线程是否已调用waitOne来唤醒该线程)。当然,可能还存在其他同步问题,但是由于我是多线程技术的新手,所以我不知道首先看哪里,什么是最佳解决方案。

注意,我想使用(并理解)基本技术(例如MonitorAutoResetEvent)进行同步(而不是BlockingQueue,TPL等),所以我希望对下面的代码进行一些细微的调整将使其工作。

我将不胜感激。

谢谢。

using System;
using System.Threading;
using System.Collections.Generic;
using System.IO;

class ProducerConsumerApp : IDisposable
{
public static string originalFilePath = @"D:\test.txt";
public static string outputFilePath = @"D:\test_export.txt";
public static int blockSize = 15;

int maxQueueSize = 4;  // max allowed number of objects in the queue

EventWaitHandle _consumerThreadWaitEventHandler = new AutoResetEvent(false);
EventWaitHandle _producerThreadWaitEventHandler = new AutoResetEvent(false);

Thread _consumerThread;
readonly object _lock = new object();
Queue<byte[]> _queue = new Queue<byte[]>();

public ProducerConsumerApp(Stream outputStream)
{
    _consumerThread = new Thread(dequeueObjectAndWriteItToFile);
    _consumerThread.Start(outputStream);
}

public void enqueueObject(byte[] data)
{
    lock (_lock)
    {
        // TODO !!!
        // Make sure producent doesn't enqueue more objects than the maxQueueSize is,
        // i.e. let the producent wait until consumer dequeues some object from the full queue
        if (_queue.Count > maxQueueSize)     // would "while" be better? Doesn't seem to change anything
        {
            _producerThreadWaitEventHandler.WaitOne();
        }
        // Thread.Sleep(20); // just for testing

        _queue.Enqueue(data);

        // data being read in case of a text file:
        //string str = (data==null) ? "<null>" : System.Text.Encoding.Default.GetString(data);
        //Console.WriteLine("Enqueuing data: "+str);

    }
    _consumerThreadWaitEventHandler.Set();  // data enqueued  => wake the consumerThread
}

public void Dispose()  // called automatically (IDisposable implementer) when instance is being destroyed
{
    enqueueObject(null);                         // Signal the consumer to exit.
    _consumerThread.Join();                     // Wait for the consumer's thread to finish.
    _consumerThreadWaitEventHandler.Close();    // Release any OS resources.
}

void dequeueObjectAndWriteItToFile(object outputStream)
{
    while (true)
    {
        // Thread.Sleep(20); // slow down the consumerThread to check what happens when the producer fully fills the queue
        // PROBLEM - the app gets into some infinite loop if I do this!!! What exactly is wrong?

        byte[] data = null;
        lock (_lock)
            if (_queue.Count > 0)   // queue not empty
            {
                data = _queue.Dequeue();

                _producerThreadWaitEventHandler.Set();
                // !!! This doesn't seem right - I don't want to call this in each while iteration
                // I would like to call it only if _producerThreadWaitEventHandler.WaitOne has been called 
                // but how to check such a condition?

                if (data == null) 
                {                  
                    // Console.WriteLine("Data file reading finished => let consumerThread finish and then quit the app");
                    return;                
                }
            }
        if (data != null)
        {
            ((FileStream)outputStream).Write(data, 0, data.Length); // write data from the queue to a file

            // just a test in case of a text file:
            // string str = System.Text.Encoding.Default.GetString(data);
            // Console.WriteLine("Data block retrieved from the queue and written to a file: " + str);

        } else {   // empty queue => let the consumerThread wait
            _consumerThreadWaitEventHandler.WaitOne();  // No more tasks - wait for a signal
        }
    }
}

static void Main()
{

    FileInfo originalFile = new FileInfo(originalFilePath);
    byte[] data = new byte[blockSize];
    int bytesRead;

    using (FileStream originalFileStream = originalFile.OpenRead())    // file input stream
    using (FileStream fileOutputStream = new FileStream(outputFilePath, FileMode.Create, FileAccess.Write))
    using (ProducerConsumerApp q = new ProducerConsumerApp(fileOutputStream))
    {
        while ((bytesRead = originalFileStream.Read(data, 0, blockSize)) > 0)   // reads blocks of data from a file
        {
            // test - in case of a text file:
            //string str = System.Text.Encoding.Default.GetString(data);
            //Console.WriteLine("data block read from a file:" + str);

            if (bytesRead < data.Length)
            {
                byte[] data2 = new byte[bytesRead];
                Array.Copy(data, data2, bytesRead);
                data = data2;
            }

            q.enqueueObject(data);   // put the data into the queue

            data = new byte[blockSize];
        }
    }
    // because of "using" the Dispose method is going to be called in the end which will call enqueueObject(null) resulting in stopping the consumer thread

    Console.WriteLine("Finish");
}
}
c# multithreading synchronization producer-consumer autoresetevent
1个回答
0
投票

您的问题是您在锁中等待。这意味着另一个线程也将在lock语句上阻塞,并且永远不会调用_producerThreadWaitEventHandler.Set();经典死锁。

© www.soinside.com 2019 - 2024. All rights reserved.