使用回调将函数转换为异步迭代器变体

问题描述 投票:0回答:4

场景

我得到了一个带有异步回调的函数,例如

let readFile:   (path: string, callback: (line: string, eof: boolean) => void) => void

虽然我更喜欢使用 AsyncIterable/AsyncGenerator 签名的函数:

let readFileV2: (path: string) => AsyncIterable<string>

问题

没有

readFileV2
,我必须读取像

这样的文件
let file = await new Promise((res, err) => {
    let file = ''
    readFile('./myfile.txt', (line, eof) => {
        if (eof) { return res(file) }
        file += line + '\n'
    })
})

..而

readFileV2
让我可以做得更干净,就像

let file = '';
for await (let line of readFileV2('./myfile.txt')) {
    file += line + '\n'
}

问题

有没有办法让我把

readFile
变成
readFileV2

更新澄清:

是否有一种通用方法可以将带有异步回调参数的函数转换为 AsyncGenerator/AsyncIterable 变体?

这种方法可以在上面的

readFile
函数上演示吗?

参考文献

我在这里看到两个相关问题:

但是,他们似乎没有给出明确的答案。

javascript typescript asynccallback async-iterator
4个回答
4
投票

首先免责声明:我正在回答以下问题:

给定一个数据提供函数

fn
,其形式类似于
(...args: A, callback: (data: T, done: boolean) => void) => void
,用于某些初始参数类型
A
和数据类型
T
的列表,我们如何转换这个函数
transform(fn)
以产生以下形式的新函数
(...args: A) => AsyncIterable<T>

这很可能不是一般的正确做法,因为

AsyncIterable<T>
的消费者可能会缓慢处理数据或提前中止,而
(...args: [...A, (data: T, done: boolean) => void]) => void
类型的函数不可能对此做出反应;它会在需要时对每条数据调用
callback
一次,并且直到需要时才会停止。


不过,这是一种可能的实现:

const transform = <A extends any[], T>(
    fn: (...args: [...args: A, callback: (val: T, done: boolean) => void]) => void
) => (...args: A): AsyncIterable<T> => {
    let values: Promise<[T, boolean]>[] = [];
    let resolve: (x: [T, boolean]) => void;
    values.push(new Promise(r => { resolve = r; }));
    fn(...args, (val: T, done: boolean) => {
        resolve([val, done]);
        values.push(new Promise(r => { resolve = r; }));
    });
    return async function* () {
        let val: T;
        for (let i = 0, done = false; !done; i++) {
            [val, done] = await values[i];
            delete values[i];
            yield val;
        }
    }();
}

本质上,我们提供了一个数据值的queue

values
,它被写入传递给
fn
的回调内部,并从生成器函数内部读取。这是通过一系列承诺来完成的;第一个 Promise 是手动创建的,每次数据可用时,它都会解析当前的 Promise 并将带有新 Promise 的新值推送到队列中。生成器函数等待这些承诺,从队列中提取数据,并删除消耗的数据。


要测试它,需要有人提供

fn
。这是一种可能性:

function sleep(ms: number) {
    return new Promise<void>(r => setTimeout(r, ms));
}

const provideData = async (name: string, callback: (line: string, eof: boolean) => void) => {
    const contents = [
        "This is line 1 of " + name, "and this is line 2",
        "and line 3", "and 4", "5",
        "and that's the end of " + name + "."
    ];
    for (const [line, eof] of contents.map((l, i, a) => [l, i >= a.length - 1] as const)) {
        await sleep(1000); // I guess it takes a second to read each line
        callback(line, eof);
    }
}

provideData
函数接受回调并每秒使用数组的连续行调用一次。现在我们改造它:

const provideDataV2 = transform(provideData);
// let provideDataV2: (name: string) => AsyncIterable<string>

让我们测试一下变压器:

async function foo() {
    console.log(new Date().toLocaleTimeString(), "starting")
    const iter = provideDataV2("my data");
    await sleep(2500); // not ready to read yet, I guess    
    for await (let line of iter) {
        console.log(new Date().toLocaleTimeString(), line)
    }
    console.log(new Date().toLocaleTimeString(), "done")
}
foo()

/* 
[LOG]: "2:48:36 PM",  "starting" 
[LOG]: "2:48:37 PM",  "This is line 1 of my data" 
[LOG]: "2:48:38 PM",  "and this is line 2" 
[LOG]: "2:48:39 PM",  "and line 3" 
[LOG]: "2:48:40 PM",  "and 4" 
[LOG]: "2:48:41 PM",  "5" 
[LOG]: "2:48:42 PM",  "and that's the end of my data." 
[LOG]: "2:48:42 PM",  "done" 
*/

看起来不错。

完美吗?它是否会针对奇怪的情况产生奇怪的副作用(例如,您要多次迭代它)?它应该以特定的方式处理错误吗?其他地方有推荐的解决方案吗?没有把握。这只是

transform
的一种可能实现,它遵守问题中所提出的合同。

Playground 代码链接


1
投票

从 v10 开始,这就是 NodeJS 原生 API,无需重新发明它:

const {createReadStream} = require('fs');
const {createInterface} = require('readline');

function readFileLines(fileName: string): AsyncIterable<string> {
    const input = createReadStream(fileName);
    return createInterface({input, crlfDelay: Infinity});
}

测试:

const lines = readFileLines('./test1.js');
for await(const l of lines) {
    console.log(l);
}

0
投票

是的。

我为

Deno.serve
执行此操作,它是一个 HTTP 服务器,它接受回调和像
Deno.serve(req => respondWith(req), {port: 3000})
这样的选项对象。

基本上代码是;

async function* emitterGen(opts){
  let _resolve,
      _req = new Promise((resolve,reject) => _resolve = resolve);
  Deno.serve( req => ( _resolve(req)
                     , _req = new Promise((resolve,reject) => _resolve = resolve)
                     )
            , opts
            );
  while (true){
    yield await _req;
  }
}

let reqEmitter = emitterGen({port: 3000});

for await (let req of reqEmitter){
  respondWith(req);
}

显然上面的代码被简化了,没有异常处理。但这应该足以回答你的问题了。

这是一个工作模拟服务器,它在每次随机 (0-999)

req
时创建一个随机数 (0-99) 作为请求 (
ms
),并使用
cb
调用
req
(处理程序)。 5 次迭代后停止。

function server(cb,ms){
  let count  = 5,
      looper = function(c = count,t = ms){
                 let stoid = setTimeout( req => ( cb(req)
                                                , --c && looper(c, Math.random()*1000 >>> 0)
                                                , clearTimeout(stoid)
                                                )
                                       , t
                                       , Math.random()*100 >>> 0
                                       )
               }
  looper();
}

async function* emitterGen(ms){
  let _resolve,
      _req = new Promise((resolve,reject) => _resolve = resolve);
  server( req => ( _resolve(req)
                 , _req = new Promise((resolve,reject) => _resolve = resolve)
                 )
        , ms
        );
  while (true){
    yield await _req;
  }
}

let reqEmitter = emitterGen(1000);

// since there is no top level await in SO snippets
(async function(){
  for await (let req of reqEmitter){
    console.log(`Received request is: ${req}`);
  }
})();


0
投票

我创建了一个可以从任何源生成异步生成器的类:

/** Infinite async generator. Iterates messages pushed to it until closed. */
class Machina<T> {

  #open = true;
  #queue: T[] = [];
  #resolve: (() => void) | undefined;

  async * stream(): AsyncGenerator<T> {
    this.#open = true;

    while (this.#open) {
      if (this.#queue.length) {
        yield this.#queue.shift()!;
        continue;
      }

      await new Promise<void>((_resolve) => {
        this.#resolve = _resolve;
      });
    }
  }

  push(data: T): void {
    this.#queue.push(data);
    this.#resolve?.();
  }

  close(): void {
    this.#open = false;
    this.#resolve?.();
  }

}

export { Machina };

你可以这样使用它:

// Create the Machina instance
const machina = new Machina<string>();

// Async generator loop
async function getMessages() {
  for await (const msg of machina.stream()) {
    console.log(msg);
  }
}

// Start the generator
getMessages();

// Push messages to it
machina.push('hello!');
machina.push('whats up?');
machina.push('greetings');

// Stop the generator
machina.close();

对于您的具体情况,类似这样的事情应该有效:

/** Read each line of the file as an AsyncGenerator. */
function readFileAsync(path: string): AsyncGenerator<string> {
  const machina = new Machina<string>();

  readFile(path, (line: string, eof: boolean) => {
    if (eof) {
      machina.close();
    } else {
      machina.push(line);
    }
  });

  return machina.stream();
}

// Usage
for await (const line of readFileAsync('file.txt')) {
  console.log(line);
}

如何运作

  1. 调用
    machina.stream()
    会启动无限循环,但它会立即暂停(在第一次迭代时),因为它正在等待未解决的承诺。
  2. 调用
    machina.push()
    将一个项目添加到缓冲区,然后通过解决承诺来取消暂停它。当它变得取消暂停时,它将缓冲区清空到流中,然后通过等待新的 Promise 来再次暂停它。
  3. machina.stream()
    的消费者收到推送的商品。您可以重复执行此操作。

其他注意事项:

  • 即使 Machina 实例超出范围(即准备好被垃圾收集),它的 Promise 仍然会永远运行。因此,当您完成流式传输时,您需要手动调用
    machina.close()
    (如果有的话)。仅仅打破循环是不够的!
  • 我一开始尝试了稍微不同的设计,没有使用缓冲区。事实证明,缓冲区是必需的,否则您无法在同一个tick中推送多个项目(第一个之后的所有项目都将被删除)。但是,如果您每个周期仅推送一项并主动消耗流,则缓冲区将仅包含一项。这基本上只是一个小的内存考虑。
© www.soinside.com 2019 - 2024. All rights reserved.