我有几张包含大量数据的表(大约1亿条记录)。所以我不能将这些数据存储在内存中,但是我想使用java.util.stream
类来传输这个结果集,并将此流传递给另一个类。我读到了Stream.of
和Stream.Builder
运算符,但它们是内存中的缓冲流。那么有什么方法可以解决这个问题吗?提前致谢。
更新#1
好吧,我用Google搜索并找到了jooq库。我不确定,但看起来它可能适用于我的测试用例。总而言之,我有几张包含大量数据的表格。我想流式传输我的结果集并将此流传输到另一个方法。像这样的东西:
// why return Stream<String>? Because my result set has String type
private Stream<Record> writeTableToStream(DataSource dataSource, String table) {
Stream<Record> record = null;
try (Connection connection = dataSource.getConnection()) {
String sql = "select * from " + table;
try (PreparedStatement pSt = connection.prepareStatement(sql)) {
connection.setAutoCommit(false);
pSt.setFetchSize(5000);
ResultSet resultSet = pSt.executeQuery();
//
record = DSL.using(connection)
.fetch(resultSet).stream();
}
} catch (SQLException sqlEx) {
logger.error(sqlEx);
}
return record;
}
可以请有人建议,我是否正确?谢谢。
更新#2
我在jooq上做了一些实验,现在可以说上面的决定不适合我。这段代码record = DSL.using(connection).fetch(resultSet).stream();
花了太多时间
你必须要了解的第一件事就是代码
try (Connection connection = dataSource.getConnection()) {
…
try (PreparedStatement pSt = connection.prepareStatement(sql)) {
…
return stream;
}
}
当你离开try
区块的时候不起作用,资源关闭,而Stream
的处理甚至还没有开始。
资源管理构造“try with resources”适用于方法内块范围内使用的资源,但您正在创建返回资源的工厂方法。因此,您必须确保关闭返回的流将关闭资源,并且调用者负责关闭Stream
。
此外,您需要一个从ResultSet
生成一行中的项目的函数。假设你有一个类似的方法
Record createRecord(ResultSet rs) {
…
}
你可能会创建一个基本上像Stream<Record>
Stream<Record> stream = StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
Long.MAX_VALUE,Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super Record> action) {
if(!resultSet.next()) return false;
action.accept(createRecord(resultSet));
return true;
}
}, false);
但要正确地执行它,您必须合并异常处理和资源关闭。您可以使用Stream.onClose
来注册将在Stream
关闭时执行的操作,但它必须是Runnable
,它不能抛出已检查的异常。类似地,不允许tryAdvance
方法抛出已检查的异常。由于我们不能简单地在这里嵌套try(…)
块,因此当close
已经有一个挂起的异常时,抛出的抑制异常的程序逻辑并不是免费的。
为了帮助我们,我们引入了一个新类型,它可以包含关闭操作,这些操作可能会抛出已检查的异常,并将它们包含在未经检查的异常中。通过实现AutoCloseable
本身,它可以利用try(…)
构造来安全地链接关闭操作:
interface UncheckedCloseable extends Runnable, AutoCloseable {
default void run() {
try { close(); } catch(Exception ex) { throw new RuntimeException(ex); }
}
static UncheckedCloseable wrap(AutoCloseable c) {
return c::close;
}
default UncheckedCloseable nest(AutoCloseable c) {
return ()->{ try(UncheckedCloseable c1=this) { c.close(); } };
}
}
这样,整个操作变为:
private Stream<Record> tableAsStream(DataSource dataSource, String table)
throws SQLException {
UncheckedCloseable close=null;
try {
Connection connection = dataSource.getConnection();
close=UncheckedCloseable.wrap(connection);
String sql = "select * from " + table;
PreparedStatement pSt = connection.prepareStatement(sql);
close=close.nest(pSt);
connection.setAutoCommit(false);
pSt.setFetchSize(5000);
ResultSet resultSet = pSt.executeQuery();
close=close.nest(resultSet);
return StreamSupport.stream(new Spliterators.AbstractSpliterator<Record>(
Long.MAX_VALUE,Spliterator.ORDERED) {
@Override
public boolean tryAdvance(Consumer<? super Record> action) {
try {
if(!resultSet.next()) return false;
action.accept(createRecord(resultSet));
return true;
} catch(SQLException ex) {
throw new RuntimeException(ex);
}
}
}, false).onClose(close);
} catch(SQLException sqlEx) {
if(close!=null)
try { close.close(); } catch(Exception ex) { sqlEx.addSuppressed(ex); }
throw sqlEx;
}
}
此方法在上述实用程序类的一个实例中包含所有资源Connection
,Statement
和ResultSet
的必要关闭操作。如果在初始化期间发生异常,则立即执行关闭操作,并将异常传递给调用者。如果流构建成功,则通过onClose
注册关闭操作。
因此呼叫者必须确保正确关闭
try(Stream<Record> s=tableAsStream(dataSource, table)) {
// stream operation
}
请注意,SQLException
方法中还添加了通过RuntimeException
传送tryAdvance
。因此,您现在可以毫无问题地将throws SQLException
添加到createRecord
方法中。
我要回答你问题的jOOQ部分。从jOOQ 3.8开始,现在已经有很多与jOOQ和Stream相结合的附加功能。 Other usages are also documented on this jOOQ page。
你试过这个:
Stream<Record> stream = DSL.using(connection).fetch(resultSet).stream();
实际上,这对大型结果集不起作用,因为fetch(ResultSet)
将整个结果集提取到内存中,然后在其上调用Collection.stream()
。
相反,你可以这样写:
try (Stream<Record> stream = DSL.using(connection).fetchStream(resultSet)) {
...
}
......基本上方便了:
try (Cursor<Record> cursor = DSL.using(connection).fetchLazy(resultSet)) {
Stream<Record> stream = cursor.stream();
...
}
另见DSLContext.fetchStream(ResultSet)
当然,你也可以让jOOQ执行你的SQL字符串,而不是与JDBC搏斗:
try (Stream<Record> stream =
DSL.using(dataSource)
.resultQuery("select * from {0}", DSL.name(table)) // Prevent SQL injection
.fetchSize(5000)
.fetchStream()) {
...
}
请注意,由jOOQ生成的Stream
是“足智多谋的”,即它包含对开放的ResultSet
(和PreparedStatement
)的引用。因此,如果您真的想要在方法之外返回该流,请确保它已正确关闭!
我不知道有任何着名的图书馆会为你做这件事。
也就是说,this article展示了如何使用Iterator(ResultSetIterator)包装结果集并将其作为第一个参数传递给Spliterators.spliteratorUnknownSize()
以创建Spliterator
。
然后,qzxswpoi可以使用Spliterator在其上创建一个Stream。
他们建议实施StreamSupport
类:
ResultSetIterator
然后:
public class ResultSetIterator implements Iterator {
private ResultSet rs;
private PreparedStatement ps;
private Connection connection;
private String sql;
public ResultSetIterator(Connection connection, String sql) {
assert connection != null;
assert sql != null;
this.connection = connection;
this.sql = sql;
}
public void init() {
try {
ps = connection.prepareStatement(sql);
rs = ps.executeQuery();
} catch (SQLException e) {
close();
throw new DataAccessException(e);
}
}
@Override
public boolean hasNext() {
if (ps == null) {
init();
}
try {
boolean hasMore = rs.next();
if (!hasMore) {
close();
}
return hasMore;
} catch (SQLException e) {
close();
throw new DataAccessException(e);
}
}
private void close() {
try {
rs.close();
try {
ps.close();
} catch (SQLException e) {
//nothing we can do here
}
} catch (SQLException e) {
//nothing we can do here
}
}
@Override
public Tuple next() {
try {
return SQL.rowAsTuple(sql, rs);
} catch (DataAccessException e) {
close();
throw e;
}
}
}
这是public static Stream stream(final Connection connection,
final String sql,
final Object... parms) {
return StreamSupport
.stream(Spliterators.spliteratorUnknownSize(
new ResultSetIterator(connection, sql), 0), false);
}
最简单的样本。
AbacusUtil
披露:我是AbacusUtil的开发人员。