所以,我一直致力于生产者/消费者问题,我有10个生产者和10个消费者。消费者从生产者生产的FIFO中检索一个数字。所有的消费者和生产者都应该同时工作(他们是线程)。现在,我目前对信号量不太满意,这就是我试图解决这个问题的原因。无论怎样,我都无法让它发挥作用。
semPop是pop操作的信号量。 semPush是推送操作的信号量。 semWorkPush是一个信号量,它可以防止多个线程在推送操作中同时工作。 semWorkPop与semWorkPush的作用相同,但是对于pop操作。
另外,在我编写代码的方式中接受建议,以便更容易阅读!谢谢
public class SharedFifo {
private Integer[] memory;
private Integer[] ids;
private Semaphore semPop = new Semaphore();
private Semaphore semPush = new Semaphore();
private Semaphore semWorkPush = new Semaphore();
private Semaphore semWorkPop = new Semaphore();
private int numberOfElements = 0;
private int totalSize = 0;
private int nextMemberToPop = 0;
private int tail = 0;
private int flag = 0;
public SharedFifo(int a) {
semPop.up();
semPush.up();
semWorkPush.up();
semWorkPop.up();
this.totalSize = a;
this.memory = new Integer[a];
this.ids = new Integer[a];
}
public void pushVal(int val, int id) {
if (numberOfElements == totalSize) {
semPush.down();
semPop.up();
}
semWorkPush.down();
numberOfElements++;
memory[tail] = val;
ids[tail] = id;
this.tail = (tail + 1) % (this.totalSize);
if(flag > 0) {
System.out.println("flag is bigger than 0");
semPop.up();
this.flag--;
}
semWorkPush.up();
}
public Integer[] popVal() {
Integer[] valAndId = new Integer[2];
if (numberOfElements == 0) {
semPop.down();
semPush.up();
this.flag++;
}
semWorkPop.down();
valAndId[0] = memory[nextMemberToPop];
valAndId[1] = ids[nextMemberToPop];
this.nextMemberToPop = (this.nextMemberToPop + 1) % (this.totalSize);
numberOfElements--;
semPush.up();
semWorkPop.up();
return valAndId;
}
}
你这样做的方式是如此令人费解,我发现它是不可理解的。很抱歉这样说,但这是真的。你有这么多的标志和计数缓冲区的方法,我不会惊讶你有bug。你可能只是犯了数学错误。
你需要的只是两个数字。通常是计数和头指针。三个数字将使它更容易,头部,尾部和计数。如果在缓冲区已满时允许头部和尾部成为相同的索引,则需要计数。 (我在这里谈论一个循环缓冲区,这是我认为你正在尝试实现的。)
这是访问循环缓冲区的一些基本数学公式。我故意在这里使用原语,只是wait()
和notify()
,真正的代码应该使用java.util.concurrent
中的一个类。
class CircularBuffer {
private final int[] buffer = new int[20];
private int head;
private int count;
public synchronized void put( int i ) throws InterruptedException {
while( count == buffer.length ) wait();// full
buffer[head++] = i;
head %= buffer.length;
count++;
notifyAll();
}
public synchronized int get() throws InterruptedException {
while( count == 0 ) wait(); // empty
int tail = (head - count) % buffer.length;
tail = (tail < 0) ? tail + buffer.length : tail;
int retval = buffer[tail];
count--;
notifyAll();
return retval;
}
}
看看这个代码有多小(和更简单)?这就是你所需要的。