Concurrent iterable
This is an in-depth explanation of concurrent iterables. For simple usage examples, check out the "Getting started" page.
What is it?
A concurrent iterable, also known as a "concur iterable", is a lazy collection of values that can be iterated concurrently.
It is implemented as a function, represented by the ConcurIterable
type, that:
- Does nothing until it is called
- Takes a callback that processes each emitted value
- Returns a promise that resolves when every value has been handled or rejects when an error occurs
A concur iterable is effectively a cold push-based observable backed by some asynchronous operations.
How is it different from an AsyncIterable
?
An async iterable is neither concurrent nor push-based. It is sequential and pull-based.
For example, consider the following code:
import { asConcur, forEachConcur, mapConcur, pipe, rangeTo } from 'lfi'
const delay = ms => new Promise(resolve => setTimeout(resolve, ms))
const createConcurIterable = () =>
pipe(
rangeTo(0, 3),
asConcur,
mapConcur(() => delay(100)),
)
console.time(`concur iterable iteration`)
await forEachConcur(unused => console.log(`Iterated!`), createConcurIterable())
console.timeEnd(`concur iterable iteration`)
async function* createAsyncIterable() {
yield delay(100)
yield delay(100)
yield delay(100)
}
console.time(`async iterable iteration`)
for await (const unused of createAsyncIterable()) {
console.log(`Iterated!`)
}
console.timeEnd(`async iterable iteration`)
Iterating over the concur iterable takes just 100 milliseconds while iterating over the async iterable takes 300 milliseconds.
This is because the delay
calls are awaited sequentially in the async
iterable, and the
async generator
doesn't resume until the next time for await...of
calls
next()
on the async iterator. The caller is responsible for pulling the next value from
the iterator.
However, the delay
calls are awaited concurrently in the concur iterable. And
the caller is not responsible for pulling the next value. The concur iterable
pushes out values as fast as it can. This also means the iteration order of
concur iterables is not deterministic. It depends how quickly each value
progresses through the iterable's operations.
The async iterable protocol's behavior is a feature, not an oversight. Sequential iteration is required when the next value depends on previous values. Plus, pull-based iteration manages backpressure well. We can't always process values as quickly as they can be produced.
This is why lfi
exports functions for async iterables in addition to concur
iterables.
How is it different from chaining p-map
, p-filter
, and others?
Concur iterables are lazy, so they don't create an intermediate array for each operation:
import {
asConcur,
filterConcur,
mapConcur,
pipe,
reduceConcur,
toArray,
} from 'lfi'
import pFilter from 'p-filter'
import pMap from 'p-map'
// No intermediate arrays created
const finalArrayWithLfi = await pipe(
asConcur(someArray),
mapConcur(someFunction),
filterConcur(someOtherFunction),
// ...
// No processing happens until this call
reduceConcur(toArray()),
)
// N - 1 intermediate arrays created for N operations
const intermediateArray1 = await pMap(someFunction, someArray)
const intermediateArray2 = await pFilter(someOtherFunction, intermediateArray1)
// ...
const finalArrayWithoutLfi = await pMap(lastFunction, intermediateArrayN)
Concur iterables allow each item to flow through the operations independently of other items:
import {
asConcur,
filterConcur,
mapConcur,
pipe,
reduceConcur,
toArray,
} from 'lfi'
import pFilter from 'p-filter'
import pMap from 'p-map'
// Hypothetical delays for each item
const mapDelays = [5, 1, 1]
const filterDelays = [1, 1, 5]
const delay = ms => new Promise(resolve => setTimeout(resolve, ms))
const mapFn = i => delay(mapDelays[i] * 1000).then(() => i)
const filterFn = i => delay(filterDelays[i] * 1000).then(() => true)
// Takes 6 seconds! Each item flows through the operations independently of other items
console.time(`with lfi`)
const withLfi = await pipe(
asConcur([0, 1, 2]),
mapConcur(mapFn),
filterConcur(filterFn),
reduceConcur(toArray()),
)
console.timeEnd(`with lfi`)
// Takes 10 seconds! The first item is a bottleneck because `p-map` waits for all callbacks
console.time(`without lfi`)
const withoutLfi = await pFilter(await pMap([0, 1, 2], mapFn), filterFn)
console.timeEnd(`without lfi`)
p-map
, p-filter
, and others are still great lightweight utilities suitable
for many use cases.
How does it work?
The asConcur
function constructs a concur iterable from a sync iterable. Here
is a simplified implementation:
const asConcur = iterable => apply =>
Promise.all(Array.from(iterable, value => apply(value)))
The implementation returns a function that calls the apply
callback for each
value in the iterable and returns a promise that resolves once all values have
been processed, all while accounting for any async processing in apply
.
We can iterate over concur iterables:
import { asConcur } from 'lfi'
const concurIterable = asConcur([`sleep`, `climb`, `eat`])
await concurIterable(console.log)
//=> sleep
//=> climb
//=> eat
We can manually filter and map them:
import { asConcur } from 'lfi'
const concurIterable = asConcur([`sleep`, `climb`, `eat`])
const transformedConcurIterable = apply =>
concurIterable(async verb => {
const [adjective] = await (
await fetch(`https://random-word-form.herokuapp.com/random/adjective`)
).json()
if (adjective.length <= 5) {
// Too short!
return
}
await apply(`${adjective} ${verb}`)
})
await transformedConcurIterable(console.log)
Or we can use lfi
's functions to filter and map them!
import { asConcur, filterMapConcur, forEachConcur, pipe } from 'lfi'
await pipe(
asConcur([`sleep`, `climb`, `eat`]),
filterMapConcur(async verb => {
const [adjective] = await (
await fetch(`https://random-word-form.herokuapp.com/random/adjective`)
).json()
return adjective.length <= 5 ? null : `${adjective} ${verb}`
}),
forEachConcur(console.log),
)