我正在测试信号量的使用与典型的生产者 - 消费者问题,其中我只有一个生产者和一个消费者。生产者一次添加一个产品,消费者可以同时提取几个产品。
要执行测试,生产者和消费者存储并从10个元素的数组中删除数字,其中0表示没有产品,任何其他数字代表产品。存储和检索项目的访问权限集中在名为Data的类中。我使用互斥锁来有序地使用向量,以防我们有多个线程同时工作。
执行时,我发现根据线程执行的操作,权限数量不正确。应用程序显示错误,因为生产者的信号量表示它具有权限,但数据向量已满。
package producer.consumer;
import java.io.IOException;
public class ProducerConsumer {
public static void main(String[] args) throws IOException {
final int MAX = 10;
Data data = new Data(MAX);
Consumer consumer = new Consumer(data);
Producer producer = new Producer(data);
consumer.start();
producer.start();
}
}
package producer.consumer;
public class Producer extends Thread{
private final Data data;
public Producer(Data data) {
this.data = data;
}
@Override
public void run() {
while (true) {
try {
data.add((int) (Math.random() * data.getLength()) + 1);
} catch (InterruptedException ex) {
System.out.println(ex.getMessage());
}
}
}
}
package producer.consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
public class Consumer extends Thread{
private final Data data;
public Consumer(Data data) {
this.data = data;
}
@Override
public void run() {
while (true) {
try {
data.remove((int) (Math.random() * data.getLength()) + 1);
} catch (InterruptedException ex) {
Logger.getLogger(Consumer.class.getName()).log(Level.SEVERE, null, ex);
}
}
}
}
package producer.consumer;
import java.io.IOException;
import java.util.Arrays;
import java.util.concurrent.Semaphore;
public class Data {
private final int[] data;
private final Semaphore mutex = new Semaphore(1);
private final Semaphore semProducer, semConsumer;
public Data(int MAX) throws IOException {
data = new int[MAX];
semProducer = new Semaphore(MAX);
semConsumer = new Semaphore(0);
}
public int getLength() {
return data.length;
}
public void add(int number) throws InterruptedException {
semProducer.acquire();
mutex.acquire();
System.out.println("trying to add a product");
int i = 0;
while (data[i] != 0) {
i++;
}
data[i] = number;
int permits = semConsumer.availablePermits() + 1;
System.out.println("data added in " + i + " " + Arrays.toString(data)
+ " Resources consumer " + permits
+ " Resources producer " + semProducer.availablePermits());
mutex.release();
semConsumer.release();
}
public void remove(int numberElements) throws InterruptedException {
semConsumer.acquire(numberElements);
mutex.acquire();
System.out.println("trying to withdraw " + numberElements);
for (int i = 0; i < numberElements; i++) {
if (data[i] != 0) {
data[i] = 0;
}
}
int permisos = semProducer.availablePermits() + 1;
System.out.println(" Retired " + numberElements + " " + Arrays.toString(data)
+ " Resources consumer " + semConsumer.availablePermits()
+ " Resources producer " + permisos);
mutex.release();
semProducer.release(numberElements);
}
}
非常感谢你的帮助。
您的消费者并不总是消费它声称消费的东西。
for (int i = 0; i < numberElements; i++) {
if (data[i] != 0) {
data[i] = 0;
}
}
假设numberElements为3,我们在data [7],data [8],data [9]中有3个可用元素。
循环终止于i == 3,没有删除任何内容,但生产者信号量仍将被“上调”3。
在使用者中,如果使用i作为数组索引,则需要覆盖整个数组,并且需要一个单独的计数器来“删除元素”。
情况并非如此,即使生产者首先填充那些,可用元素也总是位于编号最小的数据槽中。考虑生产者设法生成至少5个元素的时间顺序,然后消费者运行消耗2,然后立即再次运行以消耗3,然后再生成任何元素。数据[0]和数据[1]在消费者的第二次运行时将为空,我们将遇到我描述的情景。
编辑获取和释放许可似乎是正确的;但您需要确保消费者实际上清除正确数量的元素。
在示例中,使用编辑Data
类
public void remove(int numberElements) throws InterruptedException {
semConsumer.acquire(numberElements);
mutex.acquire();
System.out.println("remove: num-elem=" + numberElements);
int consumed=0;
for (int i = 0; consumed<numberElements; i++) {
if (data[i] != 0) {
data[i] = 0;
consumed++;
}
}
System.out.println(
" Retired " + numberElements + " " + Arrays.toString(data) );
mutex.release();
semProducer.release(numberElements);
}
另请注意,此实现效率不高(在插入和删除项目时,您需要遍历整个数组,当MAX很大时,这可能很昂贵。)