我正在对 Azure 中托管的 mongoDB 中的约 60 万个项目运行提取-转换-加载过程。目前我已经建立了一个使用
$match $skip $limit $lookup
的聚合管道。我对整个产品进行计数,设置批量大小并在 for 循环中运行管道。我遇到的问题是,在 70 批左右之后,连接超时。
我读到的是 $skip 会导致这种情况,因为每次 mongo 都必须迭代所有文档,而不是跳转到跳过前一批的文档。
我尝试在其中一个索引字段上添加 $sort 阶段,这有一点帮助,但效果不是很大。另外 $match 是第一阶段,它没有应用于索引字段,我也知道这可能会减慢速度。
我的问题是,如果我对进行 $match 的字段建立索引,这会解决我的问题吗?
还有其他解决方案可以在大型集合上运行聚合管道吗?
这是我的一些代码
export const importPipeline = (batch: number, index: number) => [
{
$match: {
$and: [{ matchingChannelId: { $eq: 10 } }, { isActive: { $eq: true } }],
},
},
{
$sort: { _id: 1 },
},
{
$skip: batch * index,
},
{
$limit: batch,
},
{
$lookup: {
from: 'collection1',
localField: 'id',
foreignField: 'forgeignId',
as: 'col1',
},
},
{
$lookup: {
from: 'collection2',
localField: 'id',
foreignField: 'forgeignId',
as: 'col2',
},
},
{
$project: {
_id: 0,
myField: 1,
otherFields: 1,
},
},
];
然后
const count = await MongoAdapter.countProducts();
const chunkSize = 1000;
const steps = Math.floor(count / chunkSize);
for (let i = 0; i < steps; i++) {
console.time(`batch${i}`);
const items = myCollection.aggregate(productImportPipeline(batch, index));
console.timeEnd(`batch${i}`);
}
我设法通过删除跳过阶段并在第一个匹配阶段添加另一个条件来解决这个问题
{ _id: { $gt: <id_of_last_item_in_the_previous_batch> } }
每次获取批次时,我都会保存最后一个项目的 ID,然后将其添加到下一个管道。