监视具有多个生产者和消费者的练习(Java)

问题描述 投票:1回答:1

在java中实现多线程编程的监视器示例我设法写了这个:

import java.util.*;
import java.util.concurrent.*;

class Monitor{
    Semaphore s_prod; // Production semaphore
    Semaphore s_rec; // Recolection semaphore
    int capacidad; // capacity of the queue
    ArrayList<Integer> cola; // my queue

    Monitor(int n){
        this.capacidad = n;
        this.cola = new ArrayList<Integer>(this.capacidad);
        this.s_prod = new Semaphore(this.capacidad);
        this.s_rec = new Semaphore(0);
    }

    public void Producir(int n){ // Producing
        try {
            this.s_prod.acquire();
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
        this.cola.add(n);
        System.out.println("|Prod:" + n + " | " + cola); // Printing production
        this.s_rec.release();
    }

    public int Recolectar(){ // Recolecting
        try {
            this.s_rec.acquire();
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
        int temp = this.cola.get(0);
        this.cola.remove(0);
        System.out.println("|Rec:" + temp + " | " + cola); // Printing recolection
        this.s_prod.release();
        return temp;
    }
}

class Productor implements Runnable{ // Productor
    Thread t;
    Monitor m;
    int a, b; // Produces number from 'a' to 'b'
    boolean finish; // flag that indicates the end of production

    Productor(String nombre, Monitor m, int i, int j){
        this.t = new Thread(this, "Productor " + nombre);
        this.a = i;
        this.b = j;
        this.m = m;
        finish = false;
    }

    public void run(){
        for(int i = a; i <= b; i++){
            m.Producir(i);
            try{
                Thread.sleep(50);
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        }
        this.finish = true;
    }

    public void empezar(){
        this.t.start();
    }
}

class Recolector implements Runnable{ // Consumer
    Thread t;
    Monitor m;

    Recolector(String nombre, Monitor m){
        this.t = new Thread(this, "Recolector " + nombre);
        this.m = m;
    }

    public void run(){
        /*
        while all the producers are still working and, even if they finished, the queue
        needs to be emptied
        */
        while(!(Checking.product_finished && m.cola.size() == 0)){
            m.Recolectar();
            try{
                Thread.sleep(2000);
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        }
    }

    public void empezar(){
        this.t.start();
    }
}

class Checking implements Runnable{ // Threads that checks if the producers finished
    public volatile static boolean product_finished;
    Productor[] p_array;

    Checking(Productor[] prods){
        p_array = prods;
        new Thread(this).start();
    }

    public void run(){
        boolean flag = true;
        while(flag){
            flag = false;
            for(Productor p:p_array){
                if(!p.finish){
                    flag = true;
                }
            }
        }
        System.out.println("Produccion finalizada!");
        product_finished = true;
    }
}

public class Test{
    public static void main(String args[]){
        Monitor m = new Monitor(3); // monitor for queue of capacity=3
        Productor p1 = new Productor("P1", m, 10, 20);
        Productor p2 = new Productor("P2", m, -50, -40);
        Recolector r1 = new Recolector("R1", m);
        Recolector r2 = new Recolector("R1", m);
        Recolector r3 = new Recolector("R1", m);

        Productor[] p_array = new Productor[2];
        p_array[0] = p1;
        p_array[1] = p2;

        Checking ch = new Checking(p_array);
        p1.empezar();
        p2.empezar();
        r1.empezar();
        r2.empezar();
        r3.empezar();
    }
}

在运行它时,输出显示执行的操作和队列状态,但是存在一些问题,例如忽略队列的最大长度,同时生成两个元素并意外删除第一个元素,或者超出范围队列。我假设需要一个“同步”语句,但我不能阻止整个程序。

此外,在“Checker”类中,我使用一个线程不断检查生产者是否完成了他们的工作,但我想知道是否有更好的方法来检查,而不会使我的PC过热。

main函数中此值的输出应该只生成三个数字,并等待一个空闲点来生成新的数字

java multithreading producer-consumer
1个回答
1
投票

我希望我的回答能帮到你。

首先,您的代码忽略了队列的最大长度,因为您使用的是ArrayList,其构造函数参数是initialCapacity,而不是最大容量。 ArrayList将在尺寸达到initialCapacity时调整大小。我建议你使用具有最大固定容量的LinkedBlockingQueue。这种类型的队列也非常适合您的任务,因为所有生产者都会等到队列中有空闲空间,所有消费者都会等到有可用元素。所以你不需要信号量。

要检查所有生产者是否已完成工作,您可以使用CompletableFuture,它提供了许多有用的方法。

完整的代码如下所示:

public class Producer implements Runnable {

    private BlockingQueue<Integer> queue;
    private int a, b;

    public Producer(BlockingQueue<Integer> queue, int a, int b) {
        this.queue = queue;
        this.a = a;
        this.b = b;
    }

    @Override
    public void run() {
        for (int i = a; i <= b; i++){
            try {
                //producer will wait here if there is no space in the queue
                queue.put(i);
                System.out.println("Put: " + i);
                Thread.sleep(50);
            } catch (InterruptedException e) {
                System.out.println(e.getMessage());
            }
        }
    }
}

public class Consumer implements Runnable {

    private BlockingQueue<Integer> queue;
    public boolean finish = false;

    public Consumer(BlockingQueue<Integer> queue) {
        this.queue = queue;
    }

    @Override
    public void run() {
        try {
            while (!finish || queue.size() > 0) {
                // consumer will wait here if the queue is empty;
                // we have to poll with timeout because several consumers may pass 
                // here while queue size is less than number of consumers;
                // timeout should be at least equal to producing interval
                Integer temp = queue.poll(3, TimeUnit.SECONDS);
                if (temp != null) {
                    System.out.println("Took: " + temp);
                    Thread.sleep(2000);
                }
            }
        } catch (InterruptedException e) {
            System.out.println(e.getMessage());
        }
    }
}

并测试它:

BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3); //queue of capacity = 3

Producer p1 = new Producer(queue, 10, 20);
Producer p2 = new Producer(queue, -50, -40);

List<Consumer> consumers = new ArrayList<>();
CompletableFuture[] consumersFutures = new CompletableFuture[3];
for (int i = 0; i < 3; i++) {
    Consumer consumer = new Consumer(queue);
    consumers.add(consumer);
    //this static method runs Runnable in separate thread
    consumersFutures[i] = CompletableFuture.runAsync(consumer);
}

CompletableFuture[] producersFutures = new CompletableFuture[2];

producersFutures[0] = CompletableFuture.runAsync(p1);
producersFutures[1] = CompletableFuture.runAsync(p2);

// allOf returns new CompletableFuture that is completed only 
// when the last given future completes
CompletableFuture.allOf(producersFutures).thenAccept(v -> {
    System.out.println("Completed producing!");
    for (Consumer consumer: consumers) {
        consumer.finish = true;
    }
});

// waiting for all consumers to complete
CompletableFuture.allOf(consumersFutures).get();

System.out.println("Completed consuming!");
© www.soinside.com 2019 - 2024. All rights reserved.