
问题描述 投票:2回答:1



[我把ALMOST似乎起作用的代码片段放在一起,但是与线程相关的某些事情显然是错误的,因此我想问问您是否有人可以发现我在做什么错。如果我在下面的程序中运行某些测试文件,程序会完成OK(至少对我和我的测试文件而言),但如果我取消Thread.Sleep(20)方法中的dequeueObjectAndWriteItToFile注释(为了测试当生产者快于消费者时,会发生什么?然后(基于控制台中打印的数据),生产者将maxQueueSize + 1数据块插入队列中,并且程序进入某个无限循环或某些循环





using System;
using System.Threading;
using System.Collections.Generic;
using System.IO;

class ProducerConsumerApp : IDisposable
public static string originalFilePath = @"D:\test.txt";
public static string outputFilePath = @"D:\test_export.txt";
public static int blockSize = 15;

int maxQueueSize = 4;  // max allowed number of objects in the queue

EventWaitHandle _consumerThreadWaitEventHandler = new AutoResetEvent(false);
EventWaitHandle _producerThreadWaitEventHandler = new AutoResetEvent(false);

Thread _consumerThread;
readonly object _lock = new object();
Queue<byte[]> _queue = new Queue<byte[]>();

public ProducerConsumerApp(Stream outputStream)
    _consumerThread = new Thread(dequeueObjectAndWriteItToFile);

public void enqueueObject(byte[] data)
    lock (_lock)
        // TODO !!!
        // Make sure producent doesn't enqueue more objects than the maxQueueSize is,
        // i.e. let the producent wait until consumer dequeues some object from the full queue
        if (_queue.Count > maxQueueSize)     // would "while" be better? Doesn't seem to change anything
        // Thread.Sleep(20); // just for testing


        // data being read in case of a text file:
        //string str = (data==null) ? "<null>" : System.Text.Encoding.Default.GetString(data);
        //Console.WriteLine("Enqueuing data: "+str);

    _consumerThreadWaitEventHandler.Set();  // data enqueued  => wake the consumerThread

public void Dispose()  // called automatically (IDisposable implementer) when instance is being destroyed
    enqueueObject(null);                         // Signal the consumer to exit.
    _consumerThread.Join();                     // Wait for the consumer's thread to finish.
    _consumerThreadWaitEventHandler.Close();    // Release any OS resources.

void dequeueObjectAndWriteItToFile(object outputStream)
    while (true)
        // Thread.Sleep(20); // slow down the consumerThread to check what happens when the producer fully fills the queue
        // PROBLEM - the app gets into some infinite loop if I do this!!! What exactly is wrong?

        byte[] data = null;
        lock (_lock)
            if (_queue.Count > 0)   // queue not empty
                data = _queue.Dequeue();

                // !!! This doesn't seem right - I don't want to call this in each while iteration
                // I would like to call it only if _producerThreadWaitEventHandler.WaitOne has been called 
                // but how to check such a condition?

                if (data == null) 
                    // Console.WriteLine("Data file reading finished => let consumerThread finish and then quit the app");
        if (data != null)
            ((FileStream)outputStream).Write(data, 0, data.Length); // write data from the queue to a file

            // just a test in case of a text file:
            // string str = System.Text.Encoding.Default.GetString(data);
            // Console.WriteLine("Data block retrieved from the queue and written to a file: " + str);

        } else {   // empty queue => let the consumerThread wait
            _consumerThreadWaitEventHandler.WaitOne();  // No more tasks - wait for a signal

static void Main()

    FileInfo originalFile = new FileInfo(originalFilePath);
    byte[] data = new byte[blockSize];
    int bytesRead;

    using (FileStream originalFileStream = originalFile.OpenRead())    // file input stream
    using (FileStream fileOutputStream = new FileStream(outputFilePath, FileMode.Create, FileAccess.Write))
    using (ProducerConsumerApp q = new ProducerConsumerApp(fileOutputStream))
        while ((bytesRead = originalFileStream.Read(data, 0, blockSize)) > 0)   // reads blocks of data from a file
            // test - in case of a text file:
            //string str = System.Text.Encoding.Default.GetString(data);
            //Console.WriteLine("data block read from a file:" + str);

            if (bytesRead < data.Length)
                byte[] data2 = new byte[bytesRead];
                Array.Copy(data, data2, bytesRead);
                data = data2;

            q.enqueueObject(data);   // put the data into the queue

            data = new byte[blockSize];
    // because of "using" the Dispose method is going to be called in the end which will call enqueueObject(null) resulting in stopping the consumer thread

c# multithreading synchronization producer-consumer autoresetevent


© www.soinside.com 2019 - 2024. All rights reserved.