我有4个大文件(每个文件约1.5gb),我想处理这些文件,读取文件的每一行,并将其转换为客户对象。我有以下的实现。
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;
import static java.nio.charset.StandardCharsets.UTF_8;
public class CustomerDataAccess {
public static void main(String[] args) throws IOException {
CustomerFileItem john = new CustomerFileItem("CustFile1", "http://w.customer1.com");
CustomerFileItem sarah = new CustomerFileItem("CustFile2", "http://w.customer2.com");
CustomerFileItem charles = new CustomerFileItem("CustFile3", "http://w.customer3.com");
List<CustomerFileItem> customers = Arrays.asList(john, sarah, charles);
Iterator<CustomerFileLineItem> custList = new CustIterator(customers);
}
public static class CustIterator implements Iterator<CustomerFileLineItem> {
private static final int HEADER_LINES = 9; // 8 + 1 blank line
BufferedReader bufferedReader;
private int index = 0;
private final List<CustomerFileItem> custFileItems = new ArrayList<>();
public CustIterator(final List<CustomerFileItem> custFileItems) throws IOException {
this.custFileItems.addAll(custFileItems);
processNext();
}
private void processNext() throws IOException {
if (bufferedReader != null) {
bufferedReader.close();
}
if (index < custFileItems.size()) { // only update if there's another file
CustomerFileItem custFileItem = custFileItems.get(index);
GZIPInputStream gis = new GZIPInputStream(new URL(custFileItem.url).openStream());
// default buffer size is 8 KB
bufferedReader = new BufferedReader(new InputStreamReader(gis, UTF_8));
// read the first few lines
for (int i = 0; i < HEADER_LINES; i++) {
bufferedReader.readLine();
}
}
index++;
}
@Override
public boolean hasNext() {
try {
boolean currentReaderStatus = bufferedReader.ready();
if (currentReaderStatus) {
return true;
} else if (index < custFileItems.size()) {
// at end of current file, try to get the next one
processNext();
return hasNext();
} else { // no more files left
return false;
}
} catch (IOException e) {
try {
bufferedReader.close();
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
throw new UncheckedIOException(e);
}
}
@Override
public CustomerFileLineItem next() {
try {
String line = bufferedReader.readLine();
if (line != null) {
return new CustomerFileLineItem(line);
} else {
return null;
}
} catch (IllegalArgumentException exception) {
return null;
} catch (IOException e) {
try {
bufferedReader.close();
} catch (IOException e1) {
throw new UncheckedIOException(e1);
}
throw new UncheckedIOException(e);
}
}
@Override
public void remove() {
throw new UnsupportedOperationException();
}
@Override
public void forEachRemaining(final Consumer<? super CustomerFileLineItem> action) {
throw new UnsupportedOperationException();
}
}
public static class CustomerFileLineItem {
private static final int NUMBER_OF_FIELDS = 4;
final String id;
final String productNumber;
final String usageType;
final String operation;
public CustomerFileLineItem(final String line) {
String[] strings = line.split(",");
if (strings.length != NUMBER_OF_FIELDS) {
throw new IllegalArgumentException(String.format("Malformed customer file line: %s", line));
}
this.id = strings[0];
this.productNumber = strings[1];
this.usageType = strings[3];
this.operation = strings[4];
}
}
static class CustomerFileItem {
private String fileName;
private String url;
public CustomerFileItem(String fileName, String url) {
this.fileName = fileName;
this.url = url;
}
}
}
在其中一个用例中,我想在输出列表(custList)中使用流。但我知道我不能使用流与 Iterator
. 我怎么能把它转换为 Spliterator
? 或者说,我怎样才能实现和Spliterator中的Iterator一样的功能?
TL;DR 你不需要实现一个 "客户对象"。Iterator
或 Spliterator
,您可以简单地使用 Stream
首先。
private static final int HEADER_LINES = 9; // 8 + 1 blank line
Stream<CustomerFileLineItem> stream = customers.stream()
.flatMap(custFileItem -> {
try {
GZIPInputStream gis
= new GZIPInputStream(new URL(custFileItem.url).openStream());
BufferedReader br = new BufferedReader(new InputStreamReader(gis, UTF_8));
// read the first few lines
for (int i = 0; i < HEADER_LINES; i++) br.readLine();
return br.lines().onClose(() -> {
try { br.close(); }
catch(IOException ex) { throw new UncheckedIOException(ex); }
});
} catch(IOException ex) {
throw new UncheckedIOException(ex);
}
})
.map(CustomerFileLineItem::new);
但为了完整起见,我们从字面上解决这个问题。
首先,你不应该添加一个方法定义,比如说
@Override
public void forEachRemaining(final Consumer<? super CustomerFileLineItem> action) {
throw new UnsupportedOperationException();
}
当你使用流API时,这个方法肯定会适得其反,因为大多数非短路操作都会在那里结束。
甚至没有理由添加它。当你不声明这个方法的时候,你会得到一个合理的默认方法,从 Iterator
界面。
当你解决了这个问题后,你就可以轻松地将 Iterator
到 Spliterator
使用 Spliterators.pliteratorUnknownSize(Iterator, int)
.
但没有理由这样做。你的代码变成了 更简单 在实施 Spliterator
在一开始的时候。
public static class CustIterator
extends Spliterators.AbstractSpliterator<CustomerFileLineItem> {
private static final int HEADER_LINES = 9; // 8 + 1 blank line
BufferedReader bufferedReader;
private final ArrayDeque<CustomerFileItem> custFileItems;
public CustIterator(final List<CustomerFileItem> custFileItems) throws IOException {
super(Long.MAX_VALUE, ORDERED|NONNULL);
this.custFileItems = new ArrayDeque<>(custFileItems);
processNext();
}
@Override
public boolean tryAdvance(Consumer<? super CustomerFileLineItem> action) {
if(bufferedReader == null) return false;
try {
String line = bufferedReader.readLine();
while(line == null) {
processNext();
if(bufferedReader == null) return false;
line = bufferedReader.readLine();
}
action.accept(new CustomerFileLineItem(line));
return true;
}
catch(IOException ex) {
if(bufferedReader != null) try {
bufferedReader.close();
bufferedReader = null;
}
catch(IOException ex2) {
ex.addSuppressed(ex2);
}
throw new UncheckedIOException(ex);
}
}
private void processNext() throws IOException {
if (bufferedReader != null) {
bufferedReader.close();
bufferedReader = null;
}
if (!custFileItems.isEmpty()) { // only update if there's another file
CustomerFileItem custFileItem = custFileItems.remove();
GZIPInputStream gis
= new GZIPInputStream(new URL(custFileItem.url).openStream());
// default buffer size is 8 KB
bufferedReader = new BufferedReader(new InputStreamReader(gis, UTF_8));
// read the first few lines
for (int i = 0; i < HEADER_LINES; i++) {
bufferedReader.readLine();
}
}
}
}
但是,就像一开始说的,你甚至不需要实现一个... ... Spliterator
这里。
每个 Iterable<T>
对象有以下方法。
Iterator<T> iterator()
返回 Iterator<T>
default Spliterator<T> spliterator()
(默认方法)返回 Spliterator<T>
因此,你要创建 Iterable<T>
回来 Iterator<T>
其中需要覆盖唯一一个非默认的抽象方法。
Iterable<CustomerFileLineItem> iterable = new Iterable<CustomerFileLineItem>() {
@Override
public Iterator<CustomerFileLineItem> iterator() {
return custList;
}
};
这可以缩短为一个lambda表达式,结果是:
Iterable<CustomerFileLineItem> iterable = () -> custList;
Spliterator<CustomerFileLineItem> spliterator = iterable.spliterator();
...这样就可以很容易地创建流。
Stream<CustomerFileLineItem> stream = StreamSupport.stream(spliterator, false);