如何在使用异步迭代器结束之前正确中断可读流

问题描述 投票:0回答:1

Node.js v20
我有一个通过调用数据库获得的“可读流”。我正在从此流创建一个异步迭代器,但我不想迭代所有数据。如果某个条件为真,我想在流到达末尾之前中断从流中读取数据。问题是,如果我手动调用 AbortError,我总是会收到
ERR_STREAM_PREMATURE_CLOSE
stream.end()
错误:
import {pipeline} from 'stream/promises';
import {createWriteStream} from 'fs';

async function* myGenerator() {
    const stream = await someCalltoDb();

    stream.on('error', (err) => {
        console.log('[STREAM ERROR]', err);
    });

    let stop = false;

    for await (const rows of stream) {
        for (const row of rows) {
            const jsonRow = await row.json();

            if (condition) {
                stop = true;
                // stream.end(); // --> This causes ERR_STREAM_PREMATURE_CLOSE instead
                break;
            }

            yield jsonRow;      
        }

        if (stop) {         
            break;
        }

    }
}

(async () => {
    const iterator = myGenerator();
    const outputStream = createWriteStream('output.txt');
    await pipeline(iterator, outputStream);
})();

enter image description here 在异步迭代器中部分消耗流的正确方法是什么?

javascript node.js stream
1个回答
0
投票
stream.end()

内调用

for await...of loop
时,会导致
ERR_STREAM_PREMATURE_CLOSE
错误。
试试这个

import { pipeline } from 'stream/promises'; import { createWriteStream } from 'fs'; async function* myGenerator() { const stream = await someCalltoDb(); stream.on('error', (err) => { console.log('[STREAM ERROR]', err); }); let stop = false; try { for await (const rows of stream) { for (const row of rows) { const jsonRow = await row.json(); if (condition) { stop = true; break; // Break out of the inner loop } yield jsonRow; } if (stop) { break; // Break out of the outer loop } } } catch (error) { console.error('[GENERATOR ERROR]', error); } finally { stream.destroy(); // Ensure the stream is properly closed } } (async () => { const iterator = myGenerator(); const outputStream = createWriteStream('output.txt'); await pipeline(iterator, outputStream); })();

© www.soinside.com 2019 - 2024. All rights reserved.