我们有一个大约6 GB的大文件,需要将其解压缩为64 GB大小(OS映像),我们需要使用http下载。我们正在使用节点的请求库或axios。使用以下代码即时下载文件并解压缩(管道):
const downloadUsingHttp = (downloadUrl, destinationPath) => {enter code here
return new Promise(async (resolve, reject) => {
const unpackedPathWriteStream = fs.createWriteStream(destinationPath);
let totalDownloadSize = 64023257088;
let downloadedSize = 0;
let lastProgressSent = 0;
axios({
method: 'get',
url: downloadUrl,
responseType: 'stream',
auth: {
username: 'user',
password: 'pass'
},
withCredentials: true
}).then(function (response) {
response.data
.on('data', chunk => {
if (totalDownloadSize === 0) {
return;
}
downloadedSize += chunk.length;
const progress = Math.floor((downloadedSize / totalDownloadSize) * 100);
if (progress % 5 !== 0) {
return;
}
if (lastProgressSent === progress) {
return;
}
lastProgressSent = progress;
console.log('Copy progress ', progress + ' %')
})
.pipe(zlib.createUnzip())
.pipe(unpackedPathWriteStream)
}).catch((err) => {
console.log(err.message)
});
unpackedPathWriteStream
.on('error', err => {
console.log(err);
reject(err);
}).on('end', () => {
resolve();
})
})
};
downloadUsingHttp(
'https://example.com/storage/file.raw.gz',
'/data/downloaded-and-unziped.raw'
);
我们正在运行此代码的计算机具有2 GB的RAM。运行此代码时,出现的问题是计算机内存不足,进度约15%,节点应用程序崩溃。有时甚至整个计算机都变得无响应,需要重新启动。
因此,看来back pressure handling在流上实现的低谷.pipe()在这种情况下不起作用。例如,当不通过(使用请求或axios库)通过http下载文件而是使用可读和可写流下载文件时,使用管道方法即时执行相同的复制和解压缩操作是可行的,并且不会浪费内存。
此外,重要的是,仅在本地网络(本地开发环境)中执行http下载时,才会出现此问题。
将提供任何帮助。
更新
我们已经尝试将流的速度限制为100 KB / s,这似乎在没有增加RAM内存使用的情况下起作用。当更改为1 MB / s时,使用率增加,最终应用程序停止运行。我们已经使用stream-throttle
库进行了尝试。
我对管道没有太多的经验,但是一次加载文件并一次将它们送入管道又如何呢?然后加载下一个块。因此管道只需一次处理少至几MB的数据。
我想像这样:
const downloadUsingHttp = (downloadUrl, destinationPath, chunkSize = 10<<20) => {
const writeStream = fs.createWriteStream(destinationPath);
const unzip = zlib.createUnzip();
const auth = {
username: 'user',
password: 'pass'
};
const nextChunk = () => axios({
method: 'get',
url: downloadUrl,
responseType: 'stream',
auth: auth,
withCredentials: true,
headers: {
Range: `bytes=${offset}-${(offset += chunkSize)}`
}
}).then(downThePipe);
const downThePipe = response => {
console.log("progress %i%% ( %i / %i bytes )", offset / length * 100, offset, length);
response.data.pipe(unzip).pipe(writeStream);
return offset < length ? nextChunk() : null;
};
let offset = 0, length;
return axios({
method: "HEAD",
url: downloadUrl,
auth: auth,
withCredentials: true,
}).then(response => {
length = response.headers["Content-Length"];
return nextChunk();
});
};
downloadUsingHttp(
'https://example.com/storage/file.raw.gz',
'/data/downloaded-and-unziped.raw'
);
[也许,如果下载速度仍然太快,您希望将nextChunk()
的加载延迟到pipe()
工作完成之后。但是,再次使用这些管道已经有一段时间了。