我得到了一个带有异步回调的函数,例如
let readFile: (path: string, callback: (line: string, eof: boolean) => void) => void
虽然我更喜欢使用 AsyncIterable/AsyncGenerator 签名的函数:
let readFileV2: (path: string) => AsyncIterable<string>
没有
readFileV2
,我必须读取像这样的文件
let file = await new Promise((res, err) => {
let file = ''
readFile('./myfile.txt', (line, eof) => {
if (eof) { return res(file) }
file += line + '\n'
})
})
..而
readFileV2
让我可以做得更干净,就像
let file = '';
for await (let line of readFileV2('./myfile.txt')) {
file += line + '\n'
}
有没有办法让我把
readFile
readFileV2
?更新澄清:
是否有一种通用方法可以将带有异步回调参数的函数转换为 AsyncGenerator/AsyncIterable 变体?
这种方法可以在上面的
readFile
函数上演示吗?
我在这里看到两个相关问题:
但是,他们似乎没有给出明确的答案。
首先免责声明:我正在回答以下问题:
给定一个数据提供函数
,其形式类似于fn
,用于某些初始参数类型(...args: A, callback: (data: T, done: boolean) => void) => void
和数据类型A
的列表,我们如何转换这个函数T
以产生以下形式的新函数transform(fn)
?(...args: A) => AsyncIterable<T>
这很可能不是一般的正确做法,因为
AsyncIterable<T>
的消费者可能会缓慢处理数据或提前中止,而 (...args: [...A, (data: T, done: boolean) => void]) => void
类型的函数不可能对此做出反应;它会在需要时对每条数据调用 callback
一次,并且直到需要时才会停止。
不过,这是一种可能的实现:
const transform = <A extends any[], T>(
fn: (...args: [...args: A, callback: (val: T, done: boolean) => void]) => void
) => (...args: A): AsyncIterable<T> => {
let values: Promise<[T, boolean]>[] = [];
let resolve: (x: [T, boolean]) => void;
values.push(new Promise(r => { resolve = r; }));
fn(...args, (val: T, done: boolean) => {
resolve([val, done]);
values.push(new Promise(r => { resolve = r; }));
});
return async function* () {
let val: T;
for (let i = 0, done = false; !done; i++) {
[val, done] = await values[i];
delete values[i];
yield val;
}
}();
}
本质上,我们提供了一个数据值的queue,
values
,它被写入传递给fn
的回调内部,并从生成器函数内部读取。这是通过一系列承诺来完成的;第一个 Promise 是手动创建的,每次数据可用时,它都会解析当前的 Promise 并将带有新 Promise 的新值推送到队列中。生成器函数等待这些承诺,从队列中提取数据,并删除消耗的数据。
要测试它,需要有人提供
fn
。这是一种可能性:
function sleep(ms: number) {
return new Promise<void>(r => setTimeout(r, ms));
}
const provideData = async (name: string, callback: (line: string, eof: boolean) => void) => {
const contents = [
"This is line 1 of " + name, "and this is line 2",
"and line 3", "and 4", "5",
"and that's the end of " + name + "."
];
for (const [line, eof] of contents.map((l, i, a) => [l, i >= a.length - 1] as const)) {
await sleep(1000); // I guess it takes a second to read each line
callback(line, eof);
}
}
provideData
函数接受回调并每秒使用数组的连续行调用一次。现在我们改造它:
const provideDataV2 = transform(provideData);
// let provideDataV2: (name: string) => AsyncIterable<string>
让我们测试一下变压器:
async function foo() {
console.log(new Date().toLocaleTimeString(), "starting")
const iter = provideDataV2("my data");
await sleep(2500); // not ready to read yet, I guess
for await (let line of iter) {
console.log(new Date().toLocaleTimeString(), line)
}
console.log(new Date().toLocaleTimeString(), "done")
}
foo()
/*
[LOG]: "2:48:36 PM", "starting"
[LOG]: "2:48:37 PM", "This is line 1 of my data"
[LOG]: "2:48:38 PM", "and this is line 2"
[LOG]: "2:48:39 PM", "and line 3"
[LOG]: "2:48:40 PM", "and 4"
[LOG]: "2:48:41 PM", "5"
[LOG]: "2:48:42 PM", "and that's the end of my data."
[LOG]: "2:48:42 PM", "done"
*/
看起来不错。
完美吗?它是否会针对奇怪的情况产生奇怪的副作用(例如,您要多次迭代它)?它应该以特定的方式处理错误吗?其他地方有推荐的解决方案吗?没有把握。这只是
transform
的一种可能实现,它遵守问题中所提出的合同。
从 v10 开始,这就是 NodeJS 原生 API,无需重新发明它:
const {createReadStream} = require('fs');
const {createInterface} = require('readline');
function readFileLines(fileName: string): AsyncIterable<string> {
const input = createReadStream(fileName);
return createInterface({input, crlfDelay: Infinity});
}
测试:
const lines = readFileLines('./test1.js');
for await(const l of lines) {
console.log(l);
}
是的。
Deno.serve
执行此操作,它是一个 HTTP 服务器,它接受回调和像 Deno.serve(req => respondWith(req), {port: 3000})
这样的选项对象。
基本上代码是;
async function* emitterGen(opts){
let _resolve,
_req = new Promise((resolve,reject) => _resolve = resolve);
Deno.serve( req => ( _resolve(req)
, _req = new Promise((resolve,reject) => _resolve = resolve)
)
, opts
);
while (true){
yield await _req;
}
}
let reqEmitter = emitterGen({port: 3000});
for await (let req of reqEmitter){
respondWith(req);
}
显然上面的代码被简化了,没有异常处理。但这应该足以回答你的问题了。
这是一个工作模拟服务器,它在每次随机 (0-999)
req
时创建一个随机数 (0-99) 作为请求 (ms
),并使用 cb
调用 req
(处理程序)。 5 次迭代后停止。
function server(cb,ms){
let count = 5,
looper = function(c = count,t = ms){
let stoid = setTimeout( req => ( cb(req)
, --c && looper(c, Math.random()*1000 >>> 0)
, clearTimeout(stoid)
)
, t
, Math.random()*100 >>> 0
)
}
looper();
}
async function* emitterGen(ms){
let _resolve,
_req = new Promise((resolve,reject) => _resolve = resolve);
server( req => ( _resolve(req)
, _req = new Promise((resolve,reject) => _resolve = resolve)
)
, ms
);
while (true){
yield await _req;
}
}
let reqEmitter = emitterGen(1000);
// since there is no top level await in SO snippets
(async function(){
for await (let req of reqEmitter){
console.log(`Received request is: ${req}`);
}
})();
我创建了一个可以从任何源生成异步生成器的类:
/** Infinite async generator. Iterates messages pushed to it until closed. */
class Machina<T> {
#open = true;
#queue: T[] = [];
#resolve: (() => void) | undefined;
async * stream(): AsyncGenerator<T> {
this.#open = true;
while (this.#open) {
if (this.#queue.length) {
yield this.#queue.shift()!;
continue;
}
await new Promise<void>((_resolve) => {
this.#resolve = _resolve;
});
}
}
push(data: T): void {
this.#queue.push(data);
this.#resolve?.();
}
close(): void {
this.#open = false;
this.#resolve?.();
}
}
export { Machina };
你可以这样使用它:
// Create the Machina instance
const machina = new Machina<string>();
// Async generator loop
async function getMessages() {
for await (const msg of machina.stream()) {
console.log(msg);
}
}
// Start the generator
getMessages();
// Push messages to it
machina.push('hello!');
machina.push('whats up?');
machina.push('greetings');
// Stop the generator
machina.close();
对于您的具体情况,类似这样的事情应该有效:
/** Read each line of the file as an AsyncGenerator. */
function readFileAsync(path: string): AsyncGenerator<string> {
const machina = new Machina<string>();
readFile(path, (line: string, eof: boolean) => {
if (eof) {
machina.close();
} else {
machina.push(line);
}
});
return machina.stream();
}
// Usage
for await (const line of readFileAsync('file.txt')) {
console.log(line);
}
machina.stream()
会启动无限循环,但它会立即暂停(在第一次迭代时),因为它正在等待未解决的承诺。machina.push()
将一个项目添加到缓冲区,然后通过解决承诺来取消暂停它。当它变得取消暂停时,它将缓冲区清空到流中,然后通过等待新的 Promise 来再次暂停它。machina.stream()
的消费者收到推送的商品。您可以重复执行此操作。其他注意事项:
machina.close()
(如果有的话)。仅仅打破循环是不够的!