我想逐步处理写入OutputStream
的文本。
例如,假设我们有这个程序:
import java.io.File;
import java.io.IOException;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.Charset;
public class Streaming {
// Writes file, incrementally, to OutputStream.
static void dump(File file, OutputStream out) throws IOException {
// Implementation omitted
}
static int sum = 0;
public static void main(String[] args) throws IOException {
Charset charSet = Charset.defaultCharset(); // Interpret the file as having this encoding.
dump(new File("file.txt"), new OutputStream() {
@Override
public void write(int b) throws IOException {
// Add b to bytes already read,
// Determine if we have reached the end of the token (using
// the default encoding),
// And parse the token and add it to `sum`
}
});
System.out.println("Sum: " + sum);
}
}
假设file.txt
是一个包含以空格分隔的整数列表的文本文件。在这个程序中,我希望找到file.txt
中的总和,累积sum
变量中的总和。我想避免构建一个长达数百万字符的字符串。
我对使用dump
函数实现此目的的方式感兴趣,该函数将文件的内容写入输出流。我对以另一种方式阅读文件不感兴趣(例如为Scanner
创建file.txt
并在扫描仪上反复调用nextInt
)。我强加了这个限制,因为我使用的库具有类似于dump
的API,客户端必须提供OutputStream
,然后库会将大量文本写入输出流。
如何实现write
方法以正确执行所概述的步骤?我想避免手工执行标记化,因为像Scanner
这样的实用程序已经能够进行标记化,我希望能够处理任何文本编码(由charSet
指定)。但是,我不能直接使用Scanner
,因为没有办法检查(以非阻塞的方式)令牌是否可用:
public static void main(String[] args) throws IOException {
Charset charSet = Charset.defaultCharset();
PipedInputStream in = new PipedInputStream();
try (Scanner sc = new Scanner(in, charSet)) {
dump(new File("file.txt"), new PipedOutputStream(in) {
@Override
public void write(byte[] b, int off, int len) throws IOException {
super.write(b, off, len);
// This will loop infinitely, because `hasNextInt`
// will block if there is no int token currently available.
if (sc.hasNextInt()) {
sum += sc.nextInt();
}
}
});
}
System.out.println("Sum: " + sum);
System.out.println(charSet);
}
是否存在可以在将数据写入输出流时为我执行标记化的非阻塞实用程序?
如果我理解你的问题,FilterOutputStream就是你想要的子类。 DigestOutputStream扩展了FilterOutputStream并做了一些类似于你想要做的事情:它在它们通过时监视字节并将它们传递给另一个类进行处理。
想到的一个解决方案是FilterOutputStream将字节传递给PipedOutputStream,连接到PipedInputStream,不同的线程读取它以创建总和:
PipedOutputStream sumSink = new PipedOutputStream();
Callable<Long> sumCalculator = new Callable<Long>() {
@Override
public Long call()
throws IOException {
long sum = 0;
PipedInputStream source = new PipedInputStream(sumSink);
try (Scanner scanner = new Scanner(source, charSet)) {
while (scanner.hasNextInt()) {
sum += scanner.nextInt();
}
}
return sum;
}
};
Future<Long> sumTask = ForkJoinPool.commonPool().submit(sumCalculator);
OutputStream dest = getTrueDestinationOutputStream();
dest = new FilterOutputStream(dest) {
@Override
public void write(int b)
throws IOException {
super.write(b);
sumSink.write(b);
}
@Override
public void write(byte[] b)
throws IOException {
super.write(b);
sumSink.write(b);
}
@Override
public void write(byte[] b,
int offset,
int len)
throws IOException {
super.write(b, offset, len);
sumSink.write(b, offset, len);
}
@Override
public void flush()
throws IOException {
super.flush();
sumSink.flush();
}
@Override
public void close()
throws IOException {
super.close();
sumSink.close();
}
};
dump(file, dest);
long sum = sumTask.get();
作为“惯用”的方法,你可能想要一个FilterOutputStream
:
这些流位于已存在的输出流(基础输出流)之上,它将其用作数据的基本接收器,但可能沿途转换数据或提供其他功能。
至少在我看来,它听起来就像你描述的那样。
这是一个具体的课程
(与OutputStream
不同)
,所以你可以逃脱的绝对最小值是提供你的构造函数和单字节write()
的实现(它将被其他write()
方法的默认实现调用):
public class SumOutputStream extends FilterOutputStream {
public int sum = 0;
public SumOutputStream(OutputStream os) {
super(os);
}
private int num = 0;
public void write(int b) throws IOException {
if (b >= '0' && b <= '9') {
sum -= num;
num = num * 10 + b - '0';
sum += num;
} else {
num = 0;
}
out.write(b);
}
public static void main(String[] args) throws IOException {
try (SumOutputStream sos = new SumOutputStream(new FileOutputStream("test.txt"))) {
sos.write("123 456 78".getBytes());
System.out.println(sos.sum);
sos.write('9');
System.out.println(sos.sum);
}
}
}
这将总结正在传递的任何数字,使得sum
始终保持最新,即使是部分结果(这就是分开9
应该显示的内容)。
基于@ tevemadar的回答。读入字符串并尝试将它们解析为整数。如果失败,则表示已完成编号,然后将其添加到总和中。唯一的问题是,如果我的方法占用最后两个字节,我的方法不会添加最后一个数字。要解决此问题,您可以添加单行方法:if(!currNumber.isEmpty()) sum += Integer.parseInt(currNumber);
,您可以在文件完成后调用它。
import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Objects;
class SumOutputStream extends FilterOutputStream {
public int sum = 0;
String currNumber = "";
String lastChar = "";
public SumOutputStream(OutputStream os){
super(os);
}
public void write(byte b[], int off, int len) throws IOException {
Objects.checkFromIndexSize(off, len, b.length);
for (int i = 0 ; i < len ; i++) {
try {
if(!lastChar.isEmpty()) {
Integer.parseInt(lastChar);
currNumber += lastChar;
}
} catch(NumberFormatException e) {
if(!currNumber.isEmpty()) sum += Integer.parseInt(currNumber);
currNumber = "";
} catch(NullPointerException e) {
e.printStackTrace();
}
write(b[off + i]);
lastChar = new String(b);
}
}
}