Skip to main content

Concurrent iterable

This is an in-depth explanation of concurrent iterables. For simple usage examples, check out the "Getting started" page.

What is it?

A concurrent iterable, also known as a "concur iterable", is a lazy collection of values that can be iterated concurrently.

It is implemented as a function, represented by the ConcurIterable type, that:

  • Does nothing until it is called
  • Takes a callback that processes each emitted value
  • Returns a promise that resolves when every value has been handled or rejects when an error occurs

A concur iterable is effectively a cold push-based observable backed by some asynchronous operations.

How is it different from an AsyncIterable?

An async iterable is neither concurrent nor push-based. It is sequential and pull-based.

For example, consider the following code:

import { asConcur, forEachConcur, mapConcur, pipe, rangeTo } from 'lfi'

const delay = ms => new Promise(resolve => setTimeout(resolve, ms))

const createConcurIterable = () =>
pipe(
rangeTo(0, 3),
asConcur,
mapConcur(() => delay(100)),
)

console.time(`concur iterable iteration`)
await forEachConcur(unused => console.log(`Iterated!`), createConcurIterable())
console.timeEnd(`concur iterable iteration`)

async function* createAsyncIterable() {
yield delay(100)
yield delay(100)
yield delay(100)
}

console.time(`async iterable iteration`)
for await (const unused of createAsyncIterable()) {
console.log(`Iterated!`)
}
console.timeEnd(`async iterable iteration`)
Playground

Iterating over the concur iterable takes just 100 milliseconds while iterating over the async iterable takes 300 milliseconds.

This is because the delay calls are awaited sequentially in the async iterable, and the async generator doesn't resume until the next time for await...of calls next() on the async iterator. The caller is responsible for pulling the next value from the iterator.

However, the delay calls are awaited concurrently in the concur iterable. And the caller is not responsible for pulling the next value. The concur iterable pushes out values as fast as it can. This also means the iteration order of concur iterables is not deterministic. It depends how quickly each value progresses through the iterable's operations.

note

The async iterable protocol's behavior is a feature, not an oversight. Sequential iteration is required when the next value depends on previous values. Plus, pull-based iteration manages backpressure well. We can't always process values as quickly as they can be produced.

This is why lfi exports functions for async iterables in addition to concur iterables.

How is it different from chaining p-map, p-filter, and others?

Concur iterables are lazy, so they don't create an intermediate array for each operation:

import {
asConcur,
filterConcur,
mapConcur,
pipe,
reduceConcur,
toArray,
} from 'lfi'
import pFilter from 'p-filter'
import pMap from 'p-map'

// No intermediate arrays created
const finalArrayWithLfi = await pipe(
asConcur(someArray),
mapConcur(someFunction),
filterConcur(someOtherFunction),
// ...
// No processing happens until this call
reduceConcur(toArray()),
)

// N - 1 intermediate arrays created for N operations
const intermediateArray1 = await pMap(someFunction, someArray)
const intermediateArray2 = await pFilter(someOtherFunction, intermediateArray1)
// ...
const finalArrayWithoutLfi = await pMap(lastFunction, intermediateArrayN)

Concur iterables allow each item to flow through the operations independently of other items:

import {
asConcur,
filterConcur,
mapConcur,
pipe,
reduceConcur,
toArray,
} from 'lfi'
import pFilter from 'p-filter'
import pMap from 'p-map'

// Hypothetical delays for each item
const mapDelays = [5, 1, 1]
const filterDelays = [1, 1, 5]

const delay = ms => new Promise(resolve => setTimeout(resolve, ms))
const mapFn = i => delay(mapDelays[i] * 1000).then(() => i)
const filterFn = i => delay(filterDelays[i] * 1000).then(() => true)

// Takes 6 seconds! Each item flows through the operations independently of other items
console.time(`with lfi`)
const withLfi = await pipe(
asConcur([0, 1, 2]),
mapConcur(mapFn),
filterConcur(filterFn),
reduceConcur(toArray()),
)
console.timeEnd(`with lfi`)

// Takes 10 seconds! The first item is a bottleneck because `p-map` waits for all callbacks
console.time(`without lfi`)
const withoutLfi = await pFilter(await pMap([0, 1, 2], mapFn), filterFn)
console.timeEnd(`without lfi`)
Playground
note

p-map, p-filter, and others are still great lightweight utilities suitable for many use cases.

How does it work?

The asConcur function constructs a concur iterable from a sync iterable. Here is a simplified implementation:

const asConcur = iterable => apply =>
Promise.all(Array.from(iterable, value => apply(value)))

The implementation returns a function that calls the apply callback for each value in the iterable and returns a promise that resolves once all values have been processed, all while accounting for any async processing in apply.

We can iterate over concur iterables:

import { asConcur } from 'lfi'

const concurIterable = asConcur([`sleep`, `climb`, `eat`])

await concurIterable(console.log)
//=> sleep
//=> climb
//=> eat
Playground

We can manually filter and map them:

import { asConcur } from 'lfi'

const concurIterable = asConcur([`sleep`, `climb`, `eat`])

const transformedConcurIterable = apply =>
concurIterable(async verb => {
const [adjective] = await (
await fetch(`https://random-word-form.herokuapp.com/random/adjective`)
).json()
if (adjective.length <= 5) {
// Too short!
return
}

await apply(`${adjective} ${verb}`)
})

await transformedConcurIterable(console.log)
Playground

Or we can use lfi's functions to filter and map them!

import { asConcur, filterMapConcur, forEachConcur, pipe } from 'lfi'

await pipe(
asConcur([`sleep`, `climb`, `eat`]),
filterMapConcur(async verb => {
const [adjective] = await (
await fetch(`https://random-word-form.herokuapp.com/random/adjective`)
).json()
return adjective.length <= 5 ? null : `${adjective} ${verb}`
}),
forEachConcur(console.log),
)
Playground