从非常大的表中获取数据

问题描述 投票:1回答:4

我在MySQL数据库中有一个非常大的表,表Users中有2亿条记录。

我使用JDBC进行查询:

public List<Pair<Long, String>> getUsersAll() throws SQLException {
        Connection cnn = null;
        CallableStatement cs = null;
        ResultSet rs = null;
        final List<Pair<Long, String>> res = new ArrayList<>();
        try {
            cnn = dataSource.getConnection();
            cs = cnn.prepareCall("select UserPropertyKindId, login from TEST.users;");
            rs = cs.executeQuery();
            while (rs.next()) {
                res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));
            }
            return res;
        } catch (SQLException ex) {
            throw ex;
        } finally {
            DbUtils.closeQuietly(cnn, cs, rs);
        }
    }

接下来,我处理结果:

List<Pair<Long, String>> users= dao.getUsersAll();
            if (CollectionUtils.isNotEmpty(users)) {
                for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) {
                    InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable (new ArrayList<>(partition));
                    processExecutor.submit(callable);
                }
            }

但由于表非常大并且全部卸载到内存中,我的应用程序崩溃并出现错误:

com.mysql.jdbc.exceptions.jdbc4.CommunicationsException:通信链接失败

从服务器成功接收的最后一个数据包是105,619毫秒。

如何按部件接收数据并按优先级顺序处理它们,以便不立即将所有结果上传到内存中?可以创建游标并将数据上载到非阻塞队列,并在数据到达时对其进行处理。如何才能做到这一点?

更新:

我的数据库结构:https://www.db-fiddle.com/f/v377ZHkG1YZcdQsETtPm9L/3

当前算法:

  1. 获取Users表的所有数据用户:select UserPropertyKindId, login from Users;
  2. 此结果分为2000对并提交给ThreadPoolTaskExecutorList<Pair<Long, String>> users= dao.getUsersAll(); if (CollectionUtils.isNotEmpty(users)) { for (List<Pair<Long, String>> partition : Lists.partition(users, 2000)) { InconsistsUsers.InconsistsUsersCallable callable = new InconsistsUsers.InconsistsUsersCallable(new ArrayList<>(partition)); processExecutor.submit(callable)); } }
  3. 在每对可调用中进行两个查询: 第一个查询: select distinct entityId from UserPropertyValue where userPropertyKindId= ? and value = ? -- value its login from Users table 第二个查询: select UserIds from UserPropertyIndex where UserPropertyKindId = ? and Value = ?

有两种情况可能:

  1. 第一个查询的结果为空:记录,发送通知,继续下一对
  2. 第二个查询的结果不等于第一个查询的结果(varbinary数据已解码。存储了编码的entityId)。然后记录,发送通知,转到下一对。

我无法改变基地的结构。我必须在Java代码端进行所有操作。

java mysql multithreading jdbc producer-consumer
4个回答
4
投票

你应该在几个层面上处理这个问题:

JDBC driver fetch size

JDBC有一个Statement.setFetchSize()方法,它表示JDBC驱动程序在从JDBC获取之前要预取多少行。请注意,MySQL JDBC驱动程序并未真正正确实现此功能,但您可以设置setFetchSize(Integer.MIN_VALUE)以防止它一次性获取所有行。 See also this answer here

注意,您也可以使用useCursorFetch激活连接上的功能

Your own logic

您不应该将整个用户列表放在内存中。你现在正在做的是从JDBC收集所有行,然后使用Lists.partition(users, 2000)对列表进行分区。这是正确的方向,但你还没有做到。相反,做:

try (ResultSet rs = cs.executeQuery()) {
    while (rs.next()) {
        res.add(new ImmutablePair<>(rs.getLong(1), rs.getString(2)));
    }

    // Process a batch of rows:
    if (res.size() >= 2000) {
        process(res);
        res.clear();
    }
}

// Process the remaining rows
process(res);

这里的重要信息是不加载内存中的所有行,然后批量处理它们,而是在从JDBC流式传输时直接处理它们。


3
投票

而不是Java方面的Lists.partition(users,2000),您应该将每个请求的mysql结果集限制为2000。

select UserPropertyKindId, login from TEST.users limit <offset>, 2000;

更新:正如Raymond Nijland在下面的评论中所提到的,如果偏移太大,查询可能会显着减慢。

一种解决方法可能是使用offset而不是使用offset,引入where语句,例如id> last_user_id。

由于@All_safe在下面进行了评论,因此不存在自动增量ID,大限制偏移的另一种解决方法是:仅在子查询中获取主键,然后再连接回主表。这将迫使mysql不进行早期行查找,这是大偏移限制的主要问题。

但是您的原始查询只获取主键列,我不认为早期行查找适用。


1
投票

您可以优先考虑查询,例如WHERE my_priority = 1 ORDER BY my_sub_priority DESC

和雅各说,使用限制LIMIT 0, 2000

您可以分解inconsistent_users中的逻辑以查找特定缺陷,然后使用EXPLAIN中获得的洞察优化这些查询。也许find_user_defect(缺陷)方法可以帮助您设置用户。


1
投票

我遇到了类似的情况。我正在从MySQL数据库中读取数据并将其复制到MS SQL Server数据库中。不是2亿,每天只有4百万。但是我遇到了与通信链路故障相同的错误消息。我可以通过设置PreparedStatement.setFetchSize(Integer.MIN_VALUE)的fetchsize来解决它;因此通信链路故障消失了。我知道,这并不能解决你的列表问题。

© www.soinside.com 2019 - 2024. All rights reserved.