无法弄清楚为什么以下代码会陷入僵局

问题描述 投票:0回答:1

下面的Java代码读取一个大文件

input.txt
并将其分成块,然后并发读取。最后它将内容转储到
output.txt
。这是一项学术练习,而不是一个项目。所以会有更好的方法。但在下面的代码中,不知怎的,我无法弄清楚为什么它会陷入僵局。

有什么想法吗?

package org.sid.misc;

import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.*;


class FileChunk {
    private final int order;
    private final byte[] data;

    public FileChunk(int order, byte[] data) {
        this.order = order;
        this.data = data;
    }

    public int getOrder() {
        return order;
    }

    public byte[] getData() {
        return data;
    }
}


class FileReaderTask implements Runnable {
    private final String inputFilePath;
    private final BlockingQueue<FileChunk> queue;
    private final int chunkSize;
    private final int order;

    public FileReaderTask(String inputFilePath, BlockingQueue<FileChunk> queue, int chunkSize, int order) {
        this.inputFilePath = inputFilePath;
        this.queue = queue;
        this.chunkSize = chunkSize;
        this.order = order;
    }

    @Override
    public void run() {
        try (FileInputStream fis = new FileInputStream(inputFilePath);
             FileChannel channel = fis.getChannel()) {
            long position = order * chunkSize;
            ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
            if (channel.read(buffer, position) >  0) {
                buffer.flip();
                byte[] data = new byte[buffer.remaining()];
                buffer.get(data);
                queue.put(new FileChunk(order, data));
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            System.err.println("Error reading file chunk: " + e.getMessage());
        }
    }
}


class FileWriterTask implements Runnable {
    private final String outputFilePath;
    private final BlockingQueue<FileChunk> queue;

    public FileWriterTask(String outputFilePath, BlockingQueue<FileChunk> queue) {
        this.outputFilePath = outputFilePath;
        this.queue = queue;
    }

    @Override
    public void run() {
        try (FileOutputStream fos = new FileOutputStream(outputFilePath, true)) {
            while (true) {
                FileChunk chunk = queue.take();
                if (chunk.getData().length ==  0) {
                    break; // Signal to stop writing
                }
                fos.write(chunk.getData());
            }
        } catch (IOException | InterruptedException e) {
            e.printStackTrace();
            System.err.println("Error writing file chunk: " + e.getMessage());
        }
    }
}

class Test {
    private static final int CHUNK_SIZE =  1024 *  1024; //  1MB chunks
    private static final int NUM_THREADS =  Runtime.getRuntime().availableProcessors();

    public static void main(String[] args) throws InterruptedException {
        String inputFilePath = "input.txt";
        String outputFilePath = "output.txt";

        ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
        BlockingQueue<FileChunk> queue = new LinkedBlockingQueue<>();

        long startTime = System.currentTimeMillis();

        // Start the writer task
        executor.submit(new FileWriterTask(outputFilePath, queue));

        // Start the reader tasks
        for (int i =  0; i < NUM_THREADS; i++) {
            executor.submit(new FileReaderTask(inputFilePath, queue, CHUNK_SIZE, i));
        }

        // Wait for all tasks to complete
        executor.shutdown();
        executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

        long endTime = System.currentTimeMillis();
        System.out.println("Execution time: " + (endTime - startTime) + " ms");
    }
}
java concurrency deadlock java-io
1个回答
0
投票

您正在使用阻塞队列实现,并且如文档中所述,“take()”将被阻塞,直到有元素可用。

采取() 检索并删除此队列的头部,如有必要,请等待直到有元素可用。

您可以切换到其他非阻塞队列实现

© www.soinside.com 2019 - 2024. All rights reserved.