将数百万条记录从 Postgres DB 转移到另一个 Postgres,选择结果是随机的

问题描述 投票:0回答:0

我需要构建一个 API,将包含数百万条记录的 Postgres 查询数据传输到另一个 Postgres 数据库 源托管在 Azure 上,而 Postgres 在 Prem 上。

我尝试使用节点 JS 构建事务,但我遇到了问题

下面的代码

    // columns: Array<string> contains the columns I want to transfer

    const poolSource = new pg.Pool({
        user: source.user,
        password: source.password,
        database: source.database,
        host: source.host,
        port: source.port,
    });
    const clientDestination = new pg.Client({
        user: destination.user,
        password: destination.password,
        database: destination.database,
        host: destination.host,
        port: destination.port,
    });

    const queries: any = [];

    await clientDestination.connect();

    try {

        let offset = 0;
        const totalRows = 1161509;
        const batchSize = 10000;
        const numBatches = Math.ceil(totalRows / batchSize);

        await clientDestination.query('BEGIN');

        while (true) {
            console.log(offset)

            const result = await poolSource.query(SOURCE_QUERY, [offset, batchSize]);

            if (result.rows.length === 0) {
                console.log("END")
                // No more records to transfer
                break;
            }

            const values = result.rows.map((obj) => Object.values(obj));
            const query = format(
                `INSERT INTO my_table (${columns.join(',')}) VALUES %L`,
                values
            );
            queries.push({
                text: query
            })
            offset += batchSize;
        }
        clientDestination.query(queries);
        await clientDestination.query('COMMIT'); // Rollback transaction if there's an error
    }
    catch (e) {
        await clientDestination.query('ROLLBACK'); // Rollback transaction if there's an error

        console.log("e", e)
        return reply
            .code(constants.SUCCESS_CODE)
            .header(constants.CONTENT_TYPE, constants.APPLICATION_JSON)
            .send({ message: e });
    }
    finally {
        poolSource.end();
        clientDestination.end(); 
        //poolDestination.end();
    }

    return reply
        .code(constants.SUCCESS_CODE)
        .header(constants.CONTENT_TYPE, constants.APPLICATION_JSON)
        .send({ message: "finished" });

代码完全可以正常工作。 我在源数据库中遇到的问题,当我从具有相同偏移量和限制的表中进行选择时,每次选择结果都与前一个不同。 我如何循环遍历源数据并在目标中使用与源中相同的行/值提交它?

node.js postgresql etl
© www.soinside.com 2019 - 2024. All rights reserved.