millimoose
millimoose

Reputation: 39980

How to wrap a Node `EventEmitter` to be able to consume it as an `AsyncIterator` that terminates?

I'm trying to use a XML stream parsing API (xml-flow) that exposes a Node EventEmitter that emits a bunch of tag events for the tags I'm interested in, and an end event when it finishes reading the document.

I'd like to be able to munge this using Interactive Extensions, but I can't figure out how to convert this to an async iterable that ends; ix only hasfromEvent/fromEventPattern which don't seem to have a way to handle an "end" event.

Trying just:

import * as aix from 'ix/asynciterable';
import flow from 'xml-flow';

const iterTags = aix.fromEvent(flow(...), 'tag:foo');
console.log('max', aix.max(iterTags));

produces no output, while adding a .pipe(tap(console.debug)) to print the values being iterated over shows me the stream is actually being processed correctly.

Is there a way I can wire up the end event to cause the iterator to return so this works correctly?

Upvotes: 1

Views: 719

Answers (1)

Marinos An
Marinos An

Reputation: 10828

I managed to create an async iterable, by using some parts from rxjs, instead of the corresponding ixjs versions.

Whatever you do with rxjs you can always pipe from ix.from to convert observable to an async iterable.

Input:

  <root>
      <foo>
        <name>Bill</name>
        <id>1</id>
        <age>27</age>
      </foo>
      <foo>
        <name>Sally</name>
        <id>2</id>
        <age>40</age>
      </foo>
      <foo>
        <name>Kelly</name>
        <id>3</id>
        <age>37</age>
      </foo>
    </root>

code:

const {fromEvent} = require('rxjs');
const {takeUntil, tap, map} = require('rxjs/operators');
const ai = require('ix/asynciterable');

const [aiFrom, aiMax] = [ai.from, ai.max];


const flow = require('xml-flow');
const fs = require('fs');
const path = require('path');

const inFile = fs.createReadStream(path.join(__dirname, 'test.xml'));


const eventEmitter = flow(inFile);
const iterTags = aiFrom(
        fromEvent(eventEmitter, 'tag:foo')
            .pipe(takeUntil(fromEvent(eventEmitter, 'end')))
            .pipe(tap(console.log), 
                  map(o=>o.age))
    );


aiMax(iterTags).then((v)=>console.log("max age:", v));

// This works also
// (async()=>{
//     for await(el of iterTags){
//         console.log(el)
//     }
// })();

output:

{ '$name': 'foo', name: 'Bill', id: '1', age: '27' }
{ '$name': 'foo', name: 'Sally', id: '2', age: '40' }
{ '$name': 'foo', name: 'Kelly', id: '3', age: '37' }
max age: 40

Upvotes: 2

Related Questions