如果使用者线程遇到异常,如何停止生产者线程

问题描述 投票:0回答:1

我有这种使用生产者的情况,我正在使用arrayblockingqueue。如果使用者线程遇到异常,如何停止生产者线程。我需要生产者停止等待队列为空。我已经引发了强制运行时异常。但是该程序不会退出。生产者们一直等待队列排空。有人可以帮忙吗

public class ServiceClass implements Runnable{

    private final static BlockingQueue<Integer> processQueue = new ArrayBlockingQueue<>(10);
    private static final int CONSUMER_COUNT = 1;
    private boolean isConsumerInterrupted = false;

    private boolean isConsumer = false;
    private static boolean producerIsDone = false;

    public ServiceClass(boolean consumer,boolean isConsumerInterrupted) {
        this.isConsumer = consumer;
        this.isConsumerInterrupted = isConsumerInterrupted;
    }

    public static void main(String[] args) {

        long startTime = System.nanoTime();

        ExecutorService producerPool = Executors.newFixedThreadPool(1);
        producerPool.submit(new ServiceClass(false,false)); // run method is
                                                         // called   
        // create a pool of consumer threads to parse the lines read
        ExecutorService consumerPool = Executors.newFixedThreadPool(CONSUMER_COUNT);
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            consumerPool.submit(new ServiceClass(true,false)); // run method is
                                                            // called
        }
        producerPool.shutdown();
        consumerPool.shutdown();

        while (!producerPool.isTerminated() && !consumerPool.isTerminated()) {
        }

        long endTime = System.nanoTime();
        long elapsedTimeInMillis = TimeUnit.MILLISECONDS.convert((endTime - startTime), TimeUnit.NANOSECONDS);
        System.out.println("Total elapsed time: " + elapsedTimeInMillis + " ms");

    }



    @Override
    public void run() {
        if (isConsumer) {
            consume();
        } else {
            readFile(); //produce data by reading a file
        }
    }

    private void readFile() {
        //Path file = Paths.get("c:/temp/my-large-file.csv");
        try
        {

            for(int i =0;i<10000;i++) {
                if(isConsumerInterrupted) {
                    break;
                }
                processQueue.put(i);
                System.out.println("produced:" + i+"------"+processQueue.size());

            }
            //Java 8: Stream class


        } catch (Exception e){
            e.printStackTrace();
        }

        producerIsDone = true; // signal consumer
        System.out.println(Thread.currentThread().getName() + " producer is done");
    }

    private void consume() {
        try {
            while (!producerIsDone || (producerIsDone && !processQueue.isEmpty())) {

                System.out.println("consumed:" + processQueue.take()+"------"+processQueue.size());
                throw new RuntimeException();

                //System.out.println(Thread.currentThread().getName() + ":: consumer count:" + linesReadQueue.size());                
            }
            System.out.println(Thread.currentThread().getName() + " consumer is done");
        } catch (Exception e) {
            isConsumerInterrupted=true;
        }


    }




}
java exception concurrency java.util.concurrent blockingqueue
1个回答
0
投票

生产者/消费者模式的思想是使生产者与消费者脱钩。因此,您希望在消费者死后停止生产者的意愿与生产者/消费者模式的主要动机不符。

但是如果您真的需要在队列已满的情况下杀死生产者,那么可以使用queue.put()代替无限期阻塞的queue.offer(E e, long timeout, TimeUnit unit),如果队列已满,它将等待超时,如果无法添加则返回false元素添加到队列中。因此,在生产者代码中,您可以处理该返回结果。

© www.soinside.com 2019 - 2024. All rights reserved.