如何将一个迭代器转换为一个分裂器?

问题描述 投票:0回答:2

我有4个大文件(每个文件约1.5gb),我想处理这些文件,读取文件的每一行,并将其转换为客户对象。我有以下的实现。

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.UncheckedIOException;
import java.net.URL;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.function.Consumer;
import java.util.zip.GZIPInputStream;

import static java.nio.charset.StandardCharsets.UTF_8;

public class CustomerDataAccess {

    public static void main(String[] args) throws IOException {
        CustomerFileItem john = new CustomerFileItem("CustFile1", "http://w.customer1.com");
        CustomerFileItem sarah = new CustomerFileItem("CustFile2", "http://w.customer2.com");
        CustomerFileItem charles = new CustomerFileItem("CustFile3", "http://w.customer3.com");
        List<CustomerFileItem> customers = Arrays.asList(john, sarah, charles);

        Iterator<CustomerFileLineItem> custList = new CustIterator(customers);
    }

    public static class CustIterator implements Iterator<CustomerFileLineItem> {

        private static final int HEADER_LINES = 9; // 8 + 1 blank line
        BufferedReader bufferedReader;

        private int index = 0;
        private final List<CustomerFileItem> custFileItems = new ArrayList<>();


        public CustIterator(final List<CustomerFileItem> custFileItems) throws IOException {
            this.custFileItems.addAll(custFileItems);
            processNext();
        }

        private void processNext() throws IOException {
            if (bufferedReader != null) {
                bufferedReader.close();
            }
            if (index < custFileItems.size()) { // only update if there's another file
                CustomerFileItem custFileItem = custFileItems.get(index);
                GZIPInputStream gis = new GZIPInputStream(new URL(custFileItem.url).openStream());
                // default buffer size is 8 KB
                bufferedReader = new BufferedReader(new InputStreamReader(gis, UTF_8));
                // read the first few lines
                for (int i = 0; i < HEADER_LINES; i++) {
                    bufferedReader.readLine();
                }
            }
            index++;
        }

        @Override
        public boolean hasNext() {
            try {
                boolean currentReaderStatus = bufferedReader.ready();
                if (currentReaderStatus) {
                    return true;
                } else if (index < custFileItems.size()) {
                    // at end of current file, try to get the next one
                    processNext();
                    return hasNext();
                } else { // no more files left
                    return false;
                }
            } catch (IOException e) {
                try {
                    bufferedReader.close();
                } catch (IOException e1) {
                    throw new UncheckedIOException(e1);
                }
                throw new UncheckedIOException(e);
            }
        }

        @Override
        public CustomerFileLineItem next() {
            try {
                String line = bufferedReader.readLine();
                if (line != null) {
                    return new CustomerFileLineItem(line);
                } else {
                    return null;
                }
            } catch (IllegalArgumentException exception) {
                return null;
            } catch (IOException e) {
                try {
                    bufferedReader.close();
                } catch (IOException e1) {
                    throw new UncheckedIOException(e1);
                }
                throw new UncheckedIOException(e);
            }
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException();
        }

        @Override
        public void forEachRemaining(final Consumer<? super CustomerFileLineItem> action) {
            throw new UnsupportedOperationException();
        }
    }


    public static class CustomerFileLineItem {
        private static final int NUMBER_OF_FIELDS = 4;

        final String id;
        final String productNumber;
        final String usageType;
        final String operation;


        public CustomerFileLineItem(final String line) {
            String[] strings = line.split(",");
            if (strings.length != NUMBER_OF_FIELDS) {
                throw new IllegalArgumentException(String.format("Malformed customer file line: %s", line));
            }
            this.id = strings[0];
            this.productNumber = strings[1];
            this.usageType = strings[3];
            this.operation = strings[4];
        }
    }

    static class CustomerFileItem {
        private String fileName;
        private String url;

        public CustomerFileItem(String fileName, String url) {
            this.fileName = fileName;
            this.url = url;
        }
    }


}

在其中一个用例中,我想在输出列表(custList)中使用流。但我知道我不能使用流与 Iterator. 我怎么能把它转换为 Spliterator? 或者说,我怎样才能实现和Spliterator中的Iterator一样的功能?

java iterator java-stream iterable spliterator
2个回答
4
投票

TL;DR 你不需要实现一个 "客户对象"。IteratorSpliterator,您可以简单地使用 Stream 首先。

private static final int HEADER_LINES = 9; // 8 + 1 blank line
Stream<CustomerFileLineItem> stream = customers.stream()
    .flatMap(custFileItem -> {
        try {
            GZIPInputStream gis
                = new GZIPInputStream(new URL(custFileItem.url).openStream());
            BufferedReader br = new BufferedReader(new InputStreamReader(gis, UTF_8));
            // read the first few lines
            for (int i = 0; i < HEADER_LINES; i++) br.readLine();
            return br.lines().onClose(() -> {
              try { br.close(); }
              catch(IOException ex) { throw new UncheckedIOException(ex); }
            });
        } catch(IOException ex) {
            throw new UncheckedIOException(ex);
        }
    })
    .map(CustomerFileLineItem::new);

但为了完整起见,我们从字面上解决这个问题。

首先,你不应该添加一个方法定义,比如说

@Override
public void forEachRemaining(final Consumer<? super CustomerFileLineItem> action) {
    throw new UnsupportedOperationException();
}

当你使用流API时,这个方法肯定会适得其反,因为大多数非短路操作都会在那里结束。

甚至没有理由添加它。当你不声明这个方法的时候,你会得到一个合理的默认方法,从 Iterator 界面。

当你解决了这个问题后,你就可以轻松地将 IteratorSpliterator 使用 Spliterators.pliteratorUnknownSize(Iterator, int).

但没有理由这样做。你的代码变成了 更简单 在实施 Spliterator 在一开始的时候。

public static class CustIterator
                    extends Spliterators.AbstractSpliterator<CustomerFileLineItem> {
    private static final int HEADER_LINES = 9; // 8 + 1 blank line
    BufferedReader bufferedReader;

    private final ArrayDeque<CustomerFileItem> custFileItems;

    public CustIterator(final List<CustomerFileItem> custFileItems) throws IOException {
        super(Long.MAX_VALUE, ORDERED|NONNULL);
        this.custFileItems = new ArrayDeque<>(custFileItems);
        processNext();
    }

    @Override
    public boolean tryAdvance(Consumer<? super CustomerFileLineItem> action) {
        if(bufferedReader == null) return false;
        try {
            String line = bufferedReader.readLine();
            while(line == null) {
                processNext();
                if(bufferedReader == null) return false;
                line = bufferedReader.readLine();
            }
            action.accept(new CustomerFileLineItem(line));
            return true;
        }
        catch(IOException ex) {
            if(bufferedReader != null) try {
                bufferedReader.close();
                bufferedReader = null;
            }
            catch(IOException ex2) {
                ex.addSuppressed(ex2);
            }
            throw new UncheckedIOException(ex);
        }
    }

    private void processNext() throws IOException {
        if (bufferedReader != null) {
            bufferedReader.close();
            bufferedReader = null;
        }
        if (!custFileItems.isEmpty()) { // only update if there's another file
            CustomerFileItem custFileItem = custFileItems.remove();
            GZIPInputStream gis
                = new GZIPInputStream(new URL(custFileItem.url).openStream());
            // default buffer size is 8 KB
            bufferedReader = new BufferedReader(new InputStreamReader(gis, UTF_8));
            // read the first few lines
            for (int i = 0; i < HEADER_LINES; i++) {
                bufferedReader.readLine();
            }
        }
    }
}

但是,就像一开始说的,你甚至不需要实现一个... ... Spliterator 这里。


1
投票

每个 Iterable<T> 对象有以下方法。

因此,你要创建 Iterable<T> 回来 Iterator<T> 其中需要覆盖唯一一个非默认的抽象方法。

Iterable<CustomerFileLineItem> iterable = new Iterable<CustomerFileLineItem>() {
    @Override
    public Iterator<CustomerFileLineItem> iterator() {
        return custList;
    }
};

这可以缩短为一个lambda表达式,结果是:

Iterable<CustomerFileLineItem> iterable = () -> custList;
Spliterator<CustomerFileLineItem> spliterator = iterable.spliterator();

...这样就可以很容易地创建流。

Stream<CustomerFileLineItem> stream = StreamSupport.stream(spliterator, false);
© www.soinside.com 2019 - 2024. All rights reserved.