下面的Java代码读取一个大文件
input.txt
并将其分成块,然后并发读取。最后它将内容转储到output.txt
。这是一项学术练习,而不是一个项目。所以会有更好的方法。但在下面的代码中,不知怎的,我无法弄清楚为什么它会陷入僵局。
有什么想法吗?
package org.sid.misc;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
import java.util.concurrent.*;
class FileChunk {
private final int order;
private final byte[] data;
public FileChunk(int order, byte[] data) {
this.order = order;
this.data = data;
}
public int getOrder() {
return order;
}
public byte[] getData() {
return data;
}
}
class FileReaderTask implements Runnable {
private final String inputFilePath;
private final BlockingQueue<FileChunk> queue;
private final int chunkSize;
private final int order;
public FileReaderTask(String inputFilePath, BlockingQueue<FileChunk> queue, int chunkSize, int order) {
this.inputFilePath = inputFilePath;
this.queue = queue;
this.chunkSize = chunkSize;
this.order = order;
}
@Override
public void run() {
try (FileInputStream fis = new FileInputStream(inputFilePath);
FileChannel channel = fis.getChannel()) {
long position = order * chunkSize;
ByteBuffer buffer = ByteBuffer.allocate(chunkSize);
if (channel.read(buffer, position) > 0) {
buffer.flip();
byte[] data = new byte[buffer.remaining()];
buffer.get(data);
queue.put(new FileChunk(order, data));
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
System.err.println("Error reading file chunk: " + e.getMessage());
}
}
}
class FileWriterTask implements Runnable {
private final String outputFilePath;
private final BlockingQueue<FileChunk> queue;
public FileWriterTask(String outputFilePath, BlockingQueue<FileChunk> queue) {
this.outputFilePath = outputFilePath;
this.queue = queue;
}
@Override
public void run() {
try (FileOutputStream fos = new FileOutputStream(outputFilePath, true)) {
while (true) {
FileChunk chunk = queue.take();
if (chunk.getData().length == 0) {
break; // Signal to stop writing
}
fos.write(chunk.getData());
}
} catch (IOException | InterruptedException e) {
e.printStackTrace();
System.err.println("Error writing file chunk: " + e.getMessage());
}
}
}
class Test {
private static final int CHUNK_SIZE = 1024 * 1024; // 1MB chunks
private static final int NUM_THREADS = Runtime.getRuntime().availableProcessors();
public static void main(String[] args) throws InterruptedException {
String inputFilePath = "input.txt";
String outputFilePath = "output.txt";
ExecutorService executor = Executors.newFixedThreadPool(NUM_THREADS);
BlockingQueue<FileChunk> queue = new LinkedBlockingQueue<>();
long startTime = System.currentTimeMillis();
// Start the writer task
executor.submit(new FileWriterTask(outputFilePath, queue));
// Start the reader tasks
for (int i = 0; i < NUM_THREADS; i++) {
executor.submit(new FileReaderTask(inputFilePath, queue, CHUNK_SIZE, i));
}
// Wait for all tasks to complete
executor.shutdown();
executor.awaitTermination(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
long endTime = System.currentTimeMillis();
System.out.println("Execution time: " + (endTime - startTime) + " ms");
}
}
您正在使用阻塞队列实现,并且如文档中所述,“take()”将被阻塞,直到有元素可用。
采取() 检索并删除此队列的头部,如有必要,请等待直到有元素可用。
您可以切换到其他非阻塞队列实现