我必须制作一个程序,从数据库中获取数据并将查询返回的结果插入到另一个数据库中的另一个表中,该表包含数百万条记录,一小时内大约有4000万条,这是一个历史表,我以下作为仅处理 100 万数据的示例,我做了一个限制为 10 万的选择,以按页获取数据并且它有效,但我这样做只是为了测试它会如何并且它对我有效,但实际上,随着我提到的记录数量的增加,Java 会从堆内存中发送一个错误,并且它会留在那里而没有完成,我已经调整了该参数来更改内存,但它仍然死掉了。
可以使用线程来完成吗?
那会怎么样呢?或者我可以使用什么其他替代方案?
这是我的代码,它可以工作,但是当有更多数据时,发送内存错误
public void leerDatos() throws IOException {
String user = "postgres";
String pss = "admin123";
empleadoList = new ArrayList<>();
Empleados empleado = null;
ResultSet result = null;
PreparedStatement stm = null;
Connection connect = null;
final int pageSize = 100000;
int offset = 0;
try {
connect = DriverManager.getConnection("jdbc:postgresql://localhost:5432/bd_test", user, pss);
PreparedStatement stmCount = connect.prepareStatement("SELECT COUNT(*) from empleados");
ResultSet resultCount = stmCount.executeQuery();
int records = 0;
while (resultCount.next()) {
records = resultCount.getInt(1);
}
while (offset < records) {
String query = "SELECT * FROM empleados ORDER BY id LIMIT ? OFFSET ?";
stm = connect.prepareStatement(query);
stm.setInt(1, pageSize);
stm.setInt(2, offset);
result = stm.executeQuery();
while (result.next()) {
empleado = new Empleados();
empleado.setNombre(result.getString("nombre"));
empleado.setApellido(result.getString("apellido"));
empleado.setSalario(result.getString("salario"));
empleadoList.add(empleado);
}
offset += pageSize;
}
if (!empleadoList.isEmpty()) {
insertarDatos(empleadoList);
}
} catch (SQLException ex) {
System.out.println("SQL Error: " + ex.getMessage());
} finally {
try {
if (result != null) {
result.close();
}
if (stm != null) {
stm.close();
}
if (connect != null) {
connect.close();
}
} catch (SQLException ex) {
System.out.println("SQL Error en cierre: " + ex.getMessage());
}
}
}
您可以使用多线程执行数百万数据的加载。您可以使用 CompletableFuture 从数据库中获取和加载数据。您可以按照以下步骤操作:
创建ThreadPoolExecutor,根据数据量、数据库连接数和CPU核心大小来管理线程。在下面的链接中您可以检查 实施。 https://howtodoinjava.com/java/multi-threading/how-to-use-blockingqueue-and-threadpoolexecutor-in-java/
您可以添加这两个中的任何一个 - 分页概念或调度程序 每次都会选取一定量的数据并进行相同的处理。
您可以将 CompletableFuture 与 runAsync 或 SupplyAsync 一起使用。
下面使用的逻辑将首先使用从数据库中获取数据 CompletableFuture,然后使用另一个调用将其保存到另一个数据库中。
result.stream().map(str -> CompletableFuture.runAsync(()-> writeResultToS3(str),customPool)).collect(Collectors.toList());