使用多线程的生产者消费者

问题描述 投票:-2回答:1

我的生产者 - 消费者问题完全适用于1个生产者和1个消费者。它无法为2个生产者和2个消费者运行。它正在达到一些未知的死锁状态。我无法调试。有人可以帮我吗?

约束:一个制作人必须产生多达64个项目。消费者会一直运行,直到它清除所有生产的物品。


import java.util.Random;
import java.util.Scanner;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;

class Producer implements Runnable {
    private static int count = 1;
    private Random rg = new Random();
    private BlockingQueue<Object> queue = null;
    private static int pc = 0;
    static int maxPc = 0;

    public Producer(BlockingQueue<Object> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        synchronized (queue) {
                while(pc <= maxPc) {
                    try {
                            produce(pc++);
                    } catch (InterruptedException e) {
                        // TODO Auto-generated catch block
                        e.printStackTrace();
                        }
                    }
            }
        }
    public void produce(int pc) throws InterruptedException {
        synchronized(queue) {
            while(queue.size() == 8) {
                System.out.println(Thread.currentThread().getName() + " : Buffer full: waiting for consumer");
                queue.wait();
            }
        }
        synchronized(queue) {
            System.out.println("Producer: " + Thread.currentThread().getName() + " adding item "+ pc + " to the queue");
            queue.add(pc);
            //Thread.sleep(1);
            queue.notifyAll();
        }
    }
}

class Consumer implements Runnable {
    private static int consumeCount = 0;

    private BlockingQueue<Object> queue = null;
    private Random rg = new Random();

    public Consumer(BlockingQueue<Object> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        while(true) {
            try {
                consume();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
        public void consume() throws InterruptedException {
        synchronized(queue) {
            while(queue.isEmpty()) {
                System.out.println(Thread.currentThread().getName() + ": Buffer empty: waiting for producer");
                queue.wait();
            }
        }

        synchronized(queue) {
            //Thread.sleep(1);
            System.out.println("Consumer: "+ Thread.currentThread().getName()+" removing item " + queue.take() + " from the queue");
            consumeCount++;
            queue.notifyAll();
            if(consumeCount == ParallelProcess.maxCC + 1)
                System.exit(0);
        }
    }

}

public class ParallelProcess {
    static int maxCC = 0;
    int numProducer;
    int numConsumer;
    private Thread[] cThreads;
    private Thread[] pThreads;
    private BlockingQueue<Object> queue = null;

    public ParallelProcess(int numProducer, int numConsumer, int queueSize) {
        this.numProducer = numProducer;
        this.numConsumer = numConsumer;
        this.queue = new ArrayBlockingQueue<Object>(queueSize);

        // create consumer thread objects
        cThreads = new Thread[numConsumer];
        for (int i = 0; i < numConsumer; i++) {
            cThreads[i] = new Thread(new Consumer(queue));
        }

        // create producer thread objects
        pThreads = new Thread[numProducer];
        for (int i = 0; i < numProducer; i++) {
            pThreads[i] = new Thread(new Producer(queue));
        }
    }

    public void execute() {

        // start consumer threads
                for (Thread t : cThreads) {
                    t.start();
                }
        // start producer threads
                for (Thread t : pThreads) {
                    //System.out.println("tc");
                    t.start();
                }

        }

    public static void main(String[] args) {
        // provide number of producers, number of consumers and the
        // max-queue-length
        Scanner sc = new Scanner(System.in);  

           System.out.println("Enter no. of producer and conumer");  
           int n = sc.nextInt();  
        ParallelProcess process = new ParallelProcess(n, n, 8);
        maxCC = n*64;
        Producer.maxPc = maxCC;
        process.execute();
        // (new Thread()).start();
        System.out.println("Thread: " + Thread.currentThread().getName() + " `enter code here`FINISHED");
    }
}
java multithreading oop thread-safety producer-consumer
1个回答
0
投票

首先,你应该合并qazxsw poi和qazxsw poi的qazxsw poi块以使其自动化,否则它可能会导致一些不一致。

其次,你应该在synchronized之前调用wait,否则queue.add(pc)queue.notify都可能在wait状态被阻止。

生产方法:

producer

消费方式:

consumer
© www.soinside.com 2019 - 2024. All rights reserved.