生产者消费者计划在Java中使用wait()和notify()

问题描述 投票:1回答:4

我正在使用低级同步和wait()和notify()在Java中执行经典的Producer-Consumer问题。我知道有更好的实现使用java.util.concurrent包中的结构,但我的问题围绕低级实现:

private static ArrayList<Integer> list = new ArrayList<Integer>();

    static Object obj = new Object();

    public static void producer() throws InterruptedException {
        synchronized (obj) {
            while (true) {
                if (list.size() == 10) {
                    System.out.println("Queue full.. Waiting to Add");
                    obj.wait();
                } else {
                    int value = new Random().nextInt(100);
                    if (value <= 10) {
                        Thread.sleep(200);
                        System.out.println("The element added was : " + value);
                        list.add(value);
                        obj.notify();
                    }
                }
            }
        }

    }

    public static void consumer() throws InterruptedException {
        synchronized (obj) {
            while (true) {
                Thread.sleep(500);
                if (list.size() == 0) {
                    System.out.println("Queue is empty...Waiting to remove");
                    obj.wait();
                } else {
                    System.out.println("The element removed was : "
                            + list.remove(0));
                    obj.notify();
                }
            }
        }

    }

程序中有2个线程,具体为生产者和消费者各1个。代码工作得很好。

唯一的问题是生产者继续产生消息,直到达到最大值(直到列表的大小为10),消费者一次消耗所有10个消息。

如何让生产者和消费者同时工作?

这是示例输出:

The element added was : 4
The element added was : 0
The element added was : 0
The element added was : 4
The element added was : 3
The element added was : 1
The element added was : 10
The element added was : 10
The element added was : 3
The element added was : 9
Queue full.. Waiting to Add
The element removed was : 4
The element removed was : 0
The element removed was : 0
The element removed was : 4
The element removed was : 3
The element removed was : 1
The element removed was : 10
The element removed was : 10
The element removed was : 3
The element removed was : 9
Queue is empty...Waiting to remove

编辑:这是更正后的代码:

private static ArrayList<Integer> list = new ArrayList<Integer>();
    private static Object obj = new Object();

    public static void producer() throws InterruptedException {
        while (true) {
            Thread.sleep(500);
            if (list.size() == 10) {
                System.out.println("Waiting to add");
                synchronized (obj) {
                    obj.wait();
                }
            }
            synchronized (obj) {
                int value = new Random().nextInt(10);
                list.add(value);
                System.out.println("Added to list: " + value);
                obj.notify();
            }
        }
    }

    public static void consumer() throws InterruptedException {
        while (true) {
            Thread.sleep(500);
            if (list.size() == 0) {
                System.out.println("Waiting to remove");
                synchronized (obj) {
                    obj.wait();
                }
            }
            synchronized (obj) {
                int removed = list.remove(0);
                System.out.println("Removed from list: " + removed);
                obj.notify();
            }
        }
    }
java multithreading producer-consumer
4个回答
3
投票

您不能在具有相同对象的同步块中运行两个线程。当一个方法运行时,另一个方法无法运行,直到另一个线程调用wait方法。

要解决这个问题,你应该把addremove放在synchronized块中。有关更多信息,请参阅this


1
投票

生产者和消费者问题是多进程同步问题的典型例子。这描述了共享公共资源缓冲区的两个进程,生产者和消费者。生产者工作是生成数据并将其放入缓冲区,而消费者作业使用生成的数据并从缓冲区中删除。

生产者必须确保在缓冲区已满时不应添加任何元素,它应该调用wait()直到使用者消耗一些数据并且notify到生产者线程并且消费者必须确保它不应该尝试在缓冲区已经为空时从缓冲区中删除它,它应该调用wait(),它只是等到生产者生成数据并将其添加到缓冲区并使用notifynotifyAll通知消费者。

使用BlockingQueue接口可以解决这个问题,该接口管理这个生产者和消费者实现自己。

import java.util.ArrayList;
import java.util.List;
import java.util.logging.Level;
import java.util.logging.Logger;

/*
 * To change this license header, choose License Headers in Project `Properties`.
 * To change this template file, choose Tools | Templates
 * and open the template in the editor.
 */
/**
 *
 * @author sakshi
 */
public class ThreadProducer {

    static List<Integer> list = new ArrayList<Integer>();

    static class Producer implements Runnable {

        List<Integer> list;

        public Producer(List<Integer> list) {
            this.list = list;
        }

        @Override
        public void run() {
            synchronized (list) {
                for (int i = 0; i < 10; i++) {
                    if (list.size() >= 1) {
                        try {
                            System.out.println("producer is waiting ");
                            list.wait();
                        } catch (InterruptedException ex) {
                            ex.printStackTrace();
                        }
                    }

                    System.out.println("produce=" + i);
                    list.add(i);
                    list.notifyAll();
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }
                }

            }

            //To change body of generated methods, choose Tools | Templates.
        }

    }

    static class Consumer implements Runnable {

        List<Integer> list;

        public Consumer(List<Integer> list) {
            this.list = list;
        }

        @Override
        public void run() {

            synchronized (list) {
                for (int i = 0; i < 10; i++) {

                    while (list.isEmpty()) {
                        System.out.println("Consumer is waiting");
                        try {
                            list.wait();
                        } catch (InterruptedException ex) {
                            ex.printStackTrace();;
                        }

                    }

                    int k = list.remove(0);
                    System.out.println("consume=" + k);
                    list.notifyAll();
                    try {
                        Thread.sleep(500);
                    } catch (InterruptedException ex) {
                        ex.printStackTrace();
                    }

                }

            }
        }

    }

    public static void main(String[] args) {
        Thread producer = new Thread(new Producer(list));
        Thread consumer = new Thread(new Consumer(list));
        producer.start();
        consumer.start();

    }
}

输出:

produce=0
producer is waiting 
consume=0
Consumer is waiting
produce=1
producer is waiting 
consume=1
Consumer is waiting
produce=2
producer is waiting 
consume=2
Consumer is waiting
produce=3
producer is waiting 
consume=3
Consumer is waiting
produce=4
producer is waiting 
consume=4
Consumer is waiting
produce=5
producer is waiting 
consume=5
Consumer is waiting
produce=6
producer is waiting 
consume=6
Consumer is waiting
produce=7
producer is waiting 
consume=7
Consumer is waiting
produce=8
producer is waiting 
consume=8
Consumer is waiting
produce=9
consume=9

0
投票

class Resources {

private final int capacity = 2;
public static int value = 0;

LinkedList < Integer > list;

Resources() {
    list = new LinkedList < > ();
}

void consume() throws InterruptedException {
    while (true) {
        synchronized(this) {
            while (list.size() == 0) {
                wait();
            }
            int val = list.removeFirst();
            System.out.println("Value consumed:" + val);
            notify();
            //Thread.sleep(1000);
        }
    }
}

void produce() throws InterruptedException {
    while (true) {
        synchronized(this) {
            while (list.size() == capacity) {
                wait();
            }
            System.out.println("Value produced:" + value);
            list.add(value++);
            notify();
            Thread.sleep(1000);

        }
    }
}

}

class MyThread5扩展Thread {

Resources rs;
String name;

public String getNames() {
    return name;
}

public MyThread5(Resources rs, String name) {
    this.rs = rs;
    this.name = name;
}

@Override
public void run() {
    if (this.getNames().equals("Producer")) {
        try {
            this.rs.produce();
        } catch (InterruptedException ex) {
            Logger.getLogger(MyThread5.class.getName()).log(Level.SEVERE, null, ex);
        }
    } else {
        try {
            this.rs.consume();
        } catch (InterruptedException ex) {
            Logger.getLogger(MyThread5.class.getName()).log(Level.SEVERE, null, ex);
        }
    }
}

}

public class ProducerConsumerExample {

public static void main(String[] args) {
    try {
        Resources rs = new Resources();
        MyThread5 m1 = new MyThread5(rs, "Producer");
        MyThread5 m2 = new MyThread5(rs, "Consumer");
        m1.start();
        m2.start();

        m1.join();
        m2.join();
    } catch (InterruptedException ex) {
        Logger.getLogger(ProducerConsumerExample.class.getName()).log(Level.SEVERE, null, ex);
    }

}

}


-1
投票

不要使用list.size() == 10,而是可以检查list.size == 1

对于生产者生产,一个人等待消费者消费。请参阅此Producer Consumer Problem - Solution using wait and notify In Java

© www.soinside.com 2019 - 2024. All rights reserved.