user11845830
user11845830

Reputation:

Parallel asynchronous iteraor - is it possible?

Right now I have the following code:

import axios from 'axios'

const urls = ['https://google.com', 'https://yahoo.com']

async function* requests() {
  for (const url of urls) {
    yield axios.get(url)
  }
}

;(async () => {
  for await (const n of requests()) {
    console.log(n.config.url) // prints https://google.com and then https://yahoo.com
  }
})()

As is, the requests won't block the single thread of node, but they will happen in sequence. I'm wondering if it would be possible to change the code to force parallelism.

Upvotes: 3

Views: 1585

Answers (3)

FraiseVache
FraiseVache

Reputation: 1

You can use a normal iterator on Promise to do that without any external lib.

Here is a simple example in TypeScript to help with understanding typing:

import axios from 'axios'

const urls = ['https://google.com', 'https://yahoo.com']

function* requests(): IterableIterator<Promise<AxiosResponse<any>>> {
  for (const url of urls) {
    yield axios.get(url)
  }
}

async function* parallelize<T>(
  it: IterableIterator<Promise<T>>,
  parallelSize = 100,
): AsyncIterableIterator<T> {
  const processing = new Set<Promise<T>>();

  let val = it.next();
  while (!val.done) {
    const value = val.value;

    processing.add(value);
    value.then(() => processing.delete(value));

    if (processing.size >= parallelSize) {
      yield Promise.race(processing.values());
    }

    val = it.next();
  }

  while (processing.size) {
    yield Promise.race(processing.values());
  }
}

for await (const res of parallelize(requests())) {
  console.log(res.config.url); // prints https://google.com and https://yahoo.com (orders depends on who completes first)
}

What is even more awesome with this approach is that it will start new requests as soon as one is done and not serialize batches of x requests. It will constantly keep x requests running.

Upvotes: 0

Tomer Aberbach
Tomer Aberbach

Reputation: 646

You can use lfi, which supports "concurrent iterables":

pipe(
  asConcur(urls),
  mapConcur(url => axios.get(url)),
  forEachConcur(response => console.log(n.config.url)),
)

Each URL will move through the pipeline of operations without being blocked by other URLs.

Upvotes: 0

SrThompson
SrThompson

Reputation: 5748

The "simpler" no-deps way would be to batch them and yield every batch with Promise.all

import axios from 'axios'

const urls = [
  'https://jsonplaceholder.typicode.com/todos/1', 
  'https://jsonplaceholder.typicode.com/posts/1',
  'https://jsonplaceholder.typicode.com/users/1',
  'https://jsonplaceholder.typicode.com/comments/1'
]

async function* requests(batchSize = 1) {
  let batchedRequests = [];
  for (const url of urls) {
    batchedRequests.push(axios.get(url));
    if (batchedRequests.length === batchSize) {
      yield Promise.all(batchedRequests);
      batchedRequests = [];
    }
  }
  if (batchedRequests.length) { //if there are requests left in batch
    yield Promise.all(batchedRequests);
  }
}

;(async () => {
  for await (const batch of requests(2)) {
    batch.forEach(n => console.log(n.config.url)) // prints https://google.com and then https://yahoo.com
  }
})()

You can use rxjs to achieve similar results, with the advantages that observables have in terms of flexibility, but it's another library and can be more complex if you're not familiar with reactive streams. Here is a detailed post I found on the topic: https://medium.com/@ravishivt/batch-processing-with-rxjs-6408b0761f39

Upvotes: 5

Related Questions