带ResultSet的java.util.stream

问题描述 投票:36回答:4

我有几张包含大量数据的表(大约1亿条记录)。所以我不能将这些数据存储在内存中,但是我想使用java.util.stream类来传输这个结果集,并将此流传递给另一个类。我读到了Stream.ofStream.Builder运算符,但它们是内存中的缓冲流。那么有什么方法可以解决这个问题吗?提前致谢。

更新#1

好吧,我用Google搜索并找到了jooq库。我不确定,但看起来它可能适用于我的测试用例。总而言之,我有几张包含大量数据的表格。我想流式传输我的结果集并将此流传输到另一个方法。像这样的东西:

// why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {

    Stream<Record> record = null;
    try (Connection connection = dataSource.getConnection()) {
        String sql = "select * from " + table;

        try (PreparedStatement pSt = connection.prepareStatement(sql)) {
            connection.setAutoCommit(false);
            pSt.setFetchSize(5000);
            ResultSet resultSet = pSt.executeQuery();
            //
            record = DSL.using(connection)
                    .fetch(resultSet).stream();
        }
    } catch (SQLException sqlEx) {
        logger.error(sqlEx);
    }

    return record;
}

可以请有人建议,我是否正确?谢谢。

更新#2

我在jooq上做了一些实验,现在可以说上面的决定不适合我。这段代码record = DSL.using(connection).fetch(resultSet).stream();花了太多时间

java jdbc lambda java-stream jooq
4个回答
65
投票

你必须要了解的第一件事就是代码

try (Connection connection = dataSource.getConnection()) {
    …
    try (PreparedStatement pSt = connection.prepareStatement(sql)) {
        …
        return stream;
    }
}

当你离开try区块的时候不起作用,资源关闭,而Stream的处理甚至还没有开始。

资源管理构造“try with resources”适用于方法内块范围内使用的资源,但您正在创建返回资源的工厂方法。因此,您必须确保关闭返回的流将关闭资源,并且调用者负责关闭Stream


此外,您需要一个从ResultSet生成一行中的项目的函数。假设你有一个类似的方法

Record createRecord(ResultSet rs) {
    …
}

你可能会创建一个基本上像Stream<Record>

Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
    Long.MAX_VALUE,Spliterator.ORDERED) {
        @Override
        public boolean tryAdvance(Consumer<? super Record> action) {
            if(!resultSet.next()) return false;
            action.accept(createRecord(resultSet));
            return true;
        }
    }, false);

但要正确地执行它,您必须合并异常处理和资源关闭。您可以使用Stream.onClose来注册将在Stream关闭时执行的操作,但它必须是Runnable,它不能抛出已检查的异常。类似地,不允许tryAdvance方法抛出已检查的异常。由于我们不能简单地在这里嵌套try(…)块,因此当close已经有一个挂起的异常时,抛出的抑制异常的程序逻辑并不是免费的。

为了帮助我们,我们引入了一个新类型,它可以包含关闭操作,这些操作可能会抛出已检查的异常,并将它们包含在未经检查的异常中。通过实现AutoCloseable本身,它可以利用try(…)构造来安全地链接关闭操作:

interface UncheckedCloseable extends Runnable, AutoCloseable {
    default void run() {
        try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
    }
    static UncheckedCloseable wrap(AutoCloseable c) {
        return c::close;
    }
    default UncheckedCloseable nest(AutoCloseable c) {
        return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
    }
}

这样,整个操作变为:

private Stream<Record> tableAsStream(DataSource dataSource, String table)
    throws SQLException {

    UncheckedCloseable close=null;
    try {
        Connection connection = dataSource.getConnection();
        close=UncheckedCloseable.wrap(connection);
        String sql = "select * from " + table;
        PreparedStatement pSt = connection.prepareStatement(sql);
        close=close.nest(pSt);
        connection.setAutoCommit(false);
        pSt.setFetchSize(5000);
        ResultSet resultSet = pSt.executeQuery();
        close=close.nest(resultSet);
        return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
            Long.MAX_VALUE,Spliterator.ORDERED) {
            @Override
            public boolean tryAdvance(Consumer<? super Record> action) {
                try {
                    if(!resultSet.next()) return false;
                    action.accept(createRecord(resultSet));
                    return true;
                } catch(SQLException ex) {
                    throw new RuntimeException(ex);
                }
            }
        }, false).onClose(close);
    } catch(SQLException sqlEx) {
        if(close!=null)
            try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
        throw sqlEx;
    }
}

此方法在上述实用程序类的一个实例中包含所有资源ConnectionStatementResultSet的必要关闭操作。如果在初始化期间发生异常,则立即执行关闭操作,并将异常传递给调用者。如果流构建成功,则通过onClose注册关闭操作。

因此呼叫者必须确保正确关闭

try(Stream<Record> s=tableAsStream(dataSource, table)) {
    // stream operation
}

请注意,SQLException方法中还添加了通过RuntimeException传送tryAdvance。因此,您现在可以毫无问题地将throws SQLException添加到createRecord方法中。


10
投票

jOOQ

我要回答你问题的jOOQ部分。从jOOQ 3.8开始,现在已经有很多与jOOQ和Stream相结合的附加功能。 Other usages are also documented on this jOOQ page

Your suggested usage:

你试过这个:

Stream<Record> stream = DSL.using(connection).fetch(resultSet).stream();

实际上,这对大型结果集不起作用,因为fetch(ResultSet)将整个结果集提取到内存中,然后在其上调用Collection.stream()

Better (lazy) usage:

相反,你可以这样写:

try (Stream<Record> stream = DSL.using(connection).fetchStream(resultSet)) {
    ...
}

......基本上方便了:

try (Cursor<Record> cursor = DSL.using(connection).fetchLazy(resultSet)) {
    Stream<Record> stream = cursor.stream();
    ...
}

另见DSLContext.fetchStream(ResultSet)

当然,你也可以让jOOQ执行你的SQL字符串,而不是与JDBC搏斗:

try (Stream<Record> stream = 
     DSL.using(dataSource)
        .resultQuery("select * from {0}", DSL.name(table)) // Prevent SQL injection
        .fetchSize(5000)
        .fetchStream()) {
    ...
}

On try-with-resources usage

请注意,由jOOQ生成的Stream是“足智多谋的”,即它包含对开放的ResultSet(和PreparedStatement)的引用。因此,如果您真的想要在方法之外返回该流,请确保它已正确关闭!


4
投票

我不知道有任何着名的图书馆会为你做这件事。

也就是说,this article展示了如何使用Iterator(ResultSetIterator)包装结果集并将其作为第一个参数传递给Spliterators.spliteratorUnknownSize()以创建Spliterator

然后,qzxswpoi可以使用Spliterator在其上创建一个Stream。

他们建议实施StreamSupport类:

ResultSetIterator

然后:

public class ResultSetIterator implements Iterator {

    private ResultSet rs;
    private PreparedStatement ps;
    private Connection connection;
    private String sql;

    public ResultSetIterator(Connection connection, String sql) {
        assert connection != null;
        assert sql != null;
        this.connection = connection;
        this.sql = sql;
    }

    public void init() {
        try {
            ps = connection.prepareStatement(sql);
            rs = ps.executeQuery();

        } catch (SQLException e) {
            close();
            throw new DataAccessException(e);
        }
    }

    @Override
    public boolean hasNext() {
        if (ps == null) {
            init();
        }
        try {
            boolean hasMore = rs.next();
            if (!hasMore) {
                close();
            }
            return hasMore;
        } catch (SQLException e) {
            close();
            throw new DataAccessException(e);
        }

    }

    private void close() {
        try {
            rs.close();
            try {
                ps.close();
            } catch (SQLException e) {
                //nothing we can do here
            }
        } catch (SQLException e) {
            //nothing we can do here
        }
    }

    @Override
    public Tuple next() {
        try {
            return SQL.rowAsTuple(sql, rs);
        } catch (DataAccessException e) {
            close();
            throw e;
        }
    }
}

3
投票

这是public static Stream stream(final Connection connection, final String sql, final Object... parms) { return StreamSupport .stream(Spliterators.spliteratorUnknownSize( new ResultSetIterator(connection, sql), 0), false); } 最简单的样本。

AbacusUtil

披露:我是AbacusUtil的开发人员。

© www.soinside.com 2019 - 2024. All rights reserved.