如何包装一个Node`EventEmitter`,使其能够作为一个终止的`AsyncIterator`来消费?

问题描述 投票:0回答:1

我正在尝试使用一个XML流解析API (xml-flow),暴露了一个Node EventEmitter 发出一连串的 tag 我感兴趣的标签的事件,以及我感兴趣的标签的 end 事件。

我希望能够使用以下方法来实现这个功能 互动式扩展但我不知道如何将其转换为一个结束的异步迭代。ix 的异步迭代表,只有fromEventfromEventPattern 似乎没有办法处理 "结束 "事件。

试着只用:

import * as aix from 'ix/asynciterable';
import flow from 'xml-flow';

const iterTags = aix.fromEvent(flow(...), 'tag:foo');
console.log('max', aix.max(iterTags));

不会产生任何输出,而添加一个 .pipe(tap(console.debug)) 来打印正在迭代的值,这说明流的处理是正确的。

有什么方法可以让我把 end 事件,使迭代器 return 所以这能正常工作吗?

javascript node.js async-await generator ixjs
1个回答
2
投票

我设法创建了一个异步迭代器,通过使用一些来自 rxjs而不是相应的 ixjs 版本。

无论你用rxjs做什么,你总是可以从 ix.from 将observable转换为async iterable。

输入。

  <root>
      <foo>
        <name>Bill</name>
        <id>1</id>
        <age>27</age>
      </foo>
      <foo>
        <name>Sally</name>
        <id>2</id>
        <age>40</age>
      </foo>
      <foo>
        <name>Kelly</name>
        <id>3</id>
        <age>37</age>
      </foo>
    </root>

code:

const {fromEvent} = require('rxjs');
const {takeUntil, tap, map} = require('rxjs/operators');
const ai = require('ix/asynciterable');

const [aiFrom, aiMax] = [ai.from, ai.max];


const flow = require('xml-flow');
const fs = require('fs');
const path = require('path');

const inFile = fs.createReadStream(path.join(__dirname, 'test.xml'));


const eventEmitter = flow(inFile);
const iterTags = aiFrom(
        fromEvent(eventEmitter, 'tag:foo')
            .pipe(takeUntil(fromEvent(eventEmitter, 'end')))
            .pipe(tap(console.log), 
                  map(o=>o.age))
    );


aiMax(iterTags).then((v)=>console.log("max age:", v));

// This works also
// (async()=>{
//     for await(el of iterTags){
//         console.log(el)
//     }
// })();

输出:

{ '$name': 'foo', name: 'Bill', id: '1', age: '27' }
{ '$name': 'foo', name: 'Sally', id: '2', age: '40' }
{ '$name': 'foo', name: 'Kelly', id: '3', age: '37' }
max age: 40
© www.soinside.com 2019 - 2024. All rights reserved.