Reducer
This is an in-depth explanation of reducers. For simple usage examples, check out the "Getting started" page.
What is it?
A reducer defines how to combine a lazy collection of values into a single final value, synchronously or asynchronously. As the values are iteratively combined, the in-progress final value is known as the "accumulator".
A reducer is primarily represented by the OptionalReducer
,
AsyncOptionalReducer
, Reducer
, AsyncReducer
, KeyedReducer
, and
AsyncKeyedReducer
types, depending on the async work required, the possibility
of not producing a final value, and the need to access accumulator values by
key.
There are also other types that represent reducers, such as FunctionReducer
and AsyncFunctionReducer
, but they all normalize to one of the aforementioned
types using normalizeReducer
.
lfi
functions with names that start with to
, like toArray
and toSum
, are
reducers.
How do I use it?
A sync reducer can be passed to the reduce
, reduceAsync
, and reduceConcur
functions while an async reducer can be passed to the reduceAsync
and
reduceConcur
functions.
Reducers can also be composed together to create more complex and powerful ones
using higher-order reducers such toGrouped
and toMultiple
.
FunctionReducer
and AsyncFunctionReducer
- Sync
- Async
The simplest reducer is the function reducer. It defines how to combine two values of the same type into a single value, also of that type.
For example, consider the following code:
import { map, or, pipe, reduce } from 'lfi'
const sumSquares = numbers =>
pipe(
numbers,
map(number => number * number),
// Reduce the iterable to one value by repeatedly calling the function with
// pairs of values until only one remains
reduce((number1, number2) => {
console.log(`add(${number1}, ${number2})`)
return number1 + number2
}),
// But what if the iterable is empty? `reduce` returns an optional when
// passed a function reducer because it doesn't define what to do for zero
// values
or(() => 0),
)
console.log(sumSquares([1, 2, 3]))
//=> add(1, 4)
//=> add(5, 9)
//=> 14
console.log(sumSquares([]))
//=> 0
The simplest async reducer is the async function reducer. It defines how to asynchronously combine two values of the same type into a single value, also of that type.
For example, consider the following code:
import { asAsync, mapAsync, orAsync, pipe, reduceAsync } from 'lfi'
const delay = ms => new Promise(resolve => setTimeout(resolve, ms))
const sumSquares = async numbers => {
console.time(`sumSquares`)
const sum = await pipe(
// This is an async iterable, but async reducers also work with concur iterables
asAsync(numbers),
mapAsync(number => number * number),
// Reduce the async iterable to one value by repeatedly calling the function
// with pairs of values until only one remains
reduceAsync(async (number1, number2) => {
console.log(`add(${number1}, ${number2})`)
// Async work is allowed in async reducers
await delay(100)
return number1 + number2
}),
// But what if the async iterable is empty? `reduceAsync` returns an async
// optional when passed an async function reducer because it doesn't define
// what to do for zero values
orAsync(() => 0),
)
console.timeEnd(`sumSquares`)
return sum
}
console.log(await sumSquares([1, 2, 3]))
//=> add(4, 1)
//=> add(5, 9)
// NOTE: This duration may vary slightly between runs
//=> sumSquares: 200 ms
//=> 14
console.log(await sumSquares([]))
// NOTE: This duration may vary slightly between runs
//=> sumSquares: 0 ms
//=> 0
OptionalReducer
and AsyncOptionalReducer
- Sync
- Async
A slightly more complex reducer is the optional reducer. It's an object with:
- An
add
function, matching the behavior of a function reducer1 - An optional
finish
function, which transforms the final value
For example, consider the following code:
import { map, or, pipe, reduce } from 'lfi'
const sumSquares = numbers =>
pipe(
numbers,
map(number => number * number),
// This behaves identically to passing a function reducer
reduce({
add: (number1, number2) => {
console.log(`add(${number1}, ${number2})`)
return number1 + number2
},
}),
or(() => 0),
)
console.log(sumSquares([1, 2, 3]))
//=> add(1, 4)
//=> add(5, 9)
//=> 14
console.log(sumSquares([]))
//=> 0
const sumSquaresMessage = numbers =>
pipe(
numbers,
map(number => number * number),
// This behaves identically to passing a function reducer, except the final
// value is transformed using `finish`
reduce({
add: (number1, number2) => {
console.log(`add(${number1}, ${number2})`)
return number1 + number2
},
finish: sum => {
console.log(`finish(${sum})`)
return `The sum of the squares is ${sum}`
},
}),
or(() => `There were no numbers!`),
)
console.log(sumSquaresMessage([1, 2, 3]))
//=> add(1, 4)
//=> add(5, 9)
//=> finish(14)
//=> The sum of the squares is 14
console.log(sumSquaresMessage([]))
//=> There were no numbers!
A slightly more complex async reducer is the async optional reducer. It's an object with:
- An
add
function, matching the behavior of an async function reducer1 - An optional
finish
function, which asynchronously transforms the final value
For example, consider the following code:
import { asAsync, mapAsync, orAsync, pipe, reduceAsync } from 'lfi'
const delay = ms => new Promise(resolve => setTimeout(resolve, ms))
const sumSquares = async numbers => {
console.time(`sumSquares`)
const sum = await pipe(
// This is an async iterable, but async reducers also work with concur iterables
asAsync(numbers),
mapAsync(number => number * number),
// This behaves identically to passing an async function reducer
reduceAsync({
add: async (number1, number2) => {
console.log(`add(${number1}, ${number2})`)
await delay(100)
return number1 + number2
},
}),
orAsync(() => 0),
)
console.timeEnd(`sumSquares`)
return sum
}
console.log(await sumSquares([1, 2, 3]))
//=> add(4, 1)
//=> add(5, 9)
// NOTE: This duration may vary slightly between runs
//=> sumSquares: 200 ms
//=> 14
console.log(await sumSquares([]))
// NOTE: This duration may vary slightly between runs
//=> sumSquares: 0 ms
//=> 0
const sumSquaresMessage = async numbers => {
console.time(`sumSquaresMessage`)
const message = await pipe(
// This is an async iterable, but async reducers also work with concur iterables
asAsync(numbers),
mapAsync(number => number * number),
// This behaves identically to passing an async function reducer, except the
// final value is transformed using `finish`
reduceAsync({
add: async (number1, number2) => {
console.log(`add(${number1}, ${number2})`)
await delay(100)
return number1 + number2
},
finish: async sum => {
console.log(`finish(${sum})`)
await delay(1000)
return `The sum of the squares is ${sum}`
},
}),
orAsync(() => `There were no numbers!`),
)
console.timeEnd(`sumSquaresMessage`)
return message
}
console.log(await sumSquaresMessage([1, 2, 3]))
//=> add(4, 1)
//=> add(5, 9)
//=> finish(14)
// NOTE: This duration may vary slightly between runs
//=> sumSquaresMessage: 1200 ms
//=> The sum of the squares is 14
console.log(await sumSquaresMessage([]))
// NOTE: This duration may vary slightly between runs
//=> sumSquaresMessage: 0 ms
//=> There were no numbers!
Reducer
and AsyncReducer
- Sync
- Async
An even more complex reducer is the reducer. It's an object with:
- A
create
function, which produces the initial accumulator value - An
add
function, which produces a new accumulator by combining the current accumulator with a value from the iterable - An optional
finish
function, which transforms the final value
Unlike an optional reducer, the accumulator's type can differ from the type of the values in the iterable.
For example, consider the following code:
import { map, or, pipe, reduce } from 'lfi'
const sumSquares = numbers =>
pipe(
numbers,
map(number => number * number),
// Reduce the iterable to one value by calling `create`, and then calling
// `add` with the current accumulator and the next iterable value to get the
// new accumulator until the iterable is exhausted
reduce(
// This is exactly how `lfi`'s `toSum` function is implemented!
{
create: () => {
console.log(`create()`)
return 0
},
add: (sum, number) => {
console.log(`add(${sum}, ${number})`)
return sum + number
},
},
),
// No `or` needed here! `reduce` doesn't return an optional for a full
// reducer because it defines what to do for zero values using `create`
)
console.log(sumSquares([1, 2, 3]))
//=> create()
//=> add(0, 1)
//=> add(1, 4)
//=> add(5, 9)
//=> 14
console.log(sumSquares([]))
//=> create()
//=> 0
const meanSquares = numbers =>
pipe(
numbers,
map(number => number * number),
// The accumulator's type can differ from the type of the values in the
// iterable. This is useful for maintaining multiple pieces of state while
// reducing and then aggregating the final state in `finish`
reduce(
// This _isn't_ how `lfi`'s `toMean` function is implemented. Read on to
// learn more!
{
create: () => {
console.log(`create()`)
return { sum: 0, count: 0 }
},
add: ({ sum, count }, number) => {
console.log(`add({ sum: ${sum}, count: ${count} }, ${number})`)
return { sum: sum + number, count: count + 1 }
},
finish: ({ sum, count }) => {
console.log(`finish({ sum: ${sum}, count: ${count} })`)
return count === 0 ? NaN : sum / count
},
},
),
)
console.log(meanSquares([1, 2, 3]))
//=> create()
//=> add({ sum: 0, count: 0 }, 1)
//=> add({ sum: 1, count: 1 }, 4)
//=> add({ sum: 5, count: 2 }, 9)
//=> finish({ sum: 14, count: 3 })
//=> 4.666666666666667
console.log(meanSquares([]))
//=> create()
//=> finish({ sum: 0, count: 0 })
//=> NaN
An even more complex async reducer is the async reducer. It's an object with:
- A
create
function, which asynchronously produces the initial accumulator value - An
add
function, which asynchronously produces a new accumulator by combining the current accumulator with a value from the iterable - An optional
combine
function, which asynchronously produces a new accumulator by combining two accumulators - An optional
finish
function, which asynchronously transforms the final value
Unlike an async optional reducer, the accumulator's type can differ from the type of the values in the async iterable.
For example, consider the following code:
import { asAsync, mapAsync, orAsync, pipe, reduceAsync } from 'lfi'
const delay = ms => new Promise(resolve => setTimeout(resolve, ms))
const sumSquares = async numbers => {
console.time(`sumSquares`)
const sum = await pipe(
// This is an async iterable, but async reducers also work with concur iterables
asAsync(numbers),
mapAsync(number => number * number),
// Reduce the async iterable to one value by calling `create`, and then
// calling `add` with the current accumulator and the next iterable value to
// get the new accumulator or calling `combine` with two accumulators to get
// a new accumulator, until the async iterable is exhausted
reduceAsync({
create: async () => {
console.log(`create()`)
await delay(5)
return 0
},
add: async (sum, number) => {
console.log(`add(${sum}, ${number})`)
await delay(100)
return sum + number
},
combine: async (sum1, sum2) => {
console.log(`combine(${sum1}, ${sum2})`)
await delay(10)
return sum1 + sum2
},
}),
// No `or` needed here! `reduceAsync` doesn't return an async optional for a
// full async reducer because it defines what to do for zero values using `create`
)
console.timeEnd(`sumSquares`)
return sum
}
console.log(await sumSquares([1, 2, 3]))
//=> create()
//=> create()
//=> create()
//=> add(0, 1)
//=> add(0, 4)
//=> add(0, 9)
//=> combine(4, 1)
//=> combine(5, 9)
// NOTE: This duration may vary slightly between runs
//=> sumSquares: 125 ms
//=> 14
console.log(await sumSquares([]))
//=> create()
// NOTE: This duration may vary slightly between runs
//=> sumSquares: 5 ms
//=> 0
const meanSquares = async numbers => {
console.time(`meanSquares`)
const mean = await pipe(
// This is an async iterable, but async reducers also work with concur iterables
asAsync(numbers),
mapAsync(number => number * number),
// The accumulator's type can differ from the type of the values in the
// async iterable. This is useful for maintaining multiple pieces of state
// while reducing and then aggregating the final state in `finish`
reduceAsync({
create: async () => {
console.log(`create()`)
await delay(5)
return { sum: 0, count: 0 }
},
add: async ({ sum, count }, number) => {
console.log(`add({ sum: ${sum}, count: ${count} }, ${number})`)
await delay(100)
return { sum: sum + number, count: count + 1 }
},
combine: async (acc1, acc2) => {
console.log(
`combine({`,
`sum: ${acc1.sum}, count: ${acc1.count} }, {`,
`sum: ${acc2.sum}, count: ${acc2.count} })`,
)
await delay(10)
return { sum: acc1.sum + acc2.sum, count: acc1.count + acc2.count }
},
finish: async ({ sum, count }) => {
console.log(`finish({ sum: ${sum}, count: ${count} })`)
await delay(1000)
return count === 0 ? NaN : sum / count
},
}),
)
console.timeEnd(`meanSquares`)
return mean
}
console.log(await meanSquares([1, 2, 3]))
//=> create()
//=> create()
//=> create()
//=> add({ sum: 0, count: 0 }, 1)
//=> add({ sum: 0, count: 0 }, 4)
//=> add({ sum: 0, count: 0 }, 9)
//=> combine({ sum: 4, count: 1 }, { sum: 1, count: 1 })
//=> combine({ sum: 5, count: 2 }, { sum: 9, count: 1 })
//=> finish({ sum: 14, count: 3 })
// NOTE: This duration may vary slightly between runs
//=> sumSquares: 1125 ms
//=> 4.666666666666667
console.log(await meanSquares([]))
//=> create()
//=> finish({ sum: 0, count: 0 })
// NOTE: This duration may vary slightly between runs
//=> sumSquares: 1005 ms
//=> NaN
If the console logs in the playground are surprising to you, then read about the order async reducers values combine values!
KeyedReducer
and AsyncKeyedReducer
- Sync
- Async
The most complex reducer is the keyed reducer. It is essentially a full reducer that:
- Only works on iterables containing key-value pairs, arrays containing two values
- Has a
get
function, which produces the value in the accumulator associated with the given key, or theNO_ENTRY
sentinel value2 if the accumulator does not contain the key
The get
function is currently only called by the higher-order toGrouped
reducer.
It's unlikely you'd need to implement a KeyedReducer
unless you're
implementing a reducer for your own custom keyed collection type.
For example, consider the following code:
import { pipe, reduce, toArray, toGrouped, NO_ENTRY } from 'lfi'
/** A map that always has a sloth! 🦥 */
class SlothMap extends Map {
get(key) {
if (!super.has(key) && key === `sloth`) {
// Throw to avoid admitting there's no sloth.
throw new Error(`Something went wrong...`)
}
return super.get(key)
}
has(key) {
return key === `sloth` || super.has(key)
}
}
const toSlothMap = () => ({
create: () => {
console.log(`create()`)
return new SlothMap()
},
add: (acc, [key, value]) => {
console.log(
`add(${acc}, [${JSON.stringify(key)}, ${JSON.stringify(value)}])`,
)
return acc.set(key, value)
},
get: (acc, key) => {
console.log(
`get(${JSON.stringify(Object.fromEntries(acc))}, ${JSON.stringify(key)})`,
)
return acc.has(key) ? acc.get(key) : NO_ENTRY
},
})
const slothMap1 = pipe(
[
[`dog`, `bark`],
[`cat`, `meow`],
[`bunny`, `purr`],
],
// This reduces the pairs to a `SlothMap`, but doesn't make use of the `get` function
reduce(toSlothMap()),
)
console.log(slothMap1.get(`dog`))
//=> bark
console.log(slothMap1.has(`sloth`))
//=> true
const slothMap2 = pipe(
[
[`dog`, `bark`],
[`cat`, `meow`],
[`dog`, `woof`],
[`bunny`, `purr`],
[`cat`, `purr`],
],
// This reduces the pairs to a `SlothMap` that maps each key to an array of the
// values it was associated with. This _does_ make use of the `get` function
reduce(toGrouped(toArray(), toSlothMap())),
)
console.log(slothMap2.get(`dog`))
//=> [ 'bark', 'woof' ]
console.log(slothMap2.has(`sloth`))
//=> true
The most complex async reducer is the async keyed reducer. It is essentially a full async reducer that:
- Only works on iterables containing key-value pairs, arrays containing two values
- Has a
get
function, which asynchronously produces the value in the accumulator associated with the given key, or theNO_ENTRY
sentinel value2 if the accumulator does not contain the key
The get
function is currently only called by the higher-order toGrouped
reducer.
It's unlikely you'd need to implement an AsyncKeyedReducer
unless you're
implementing an async reducer for your own custom async keyed collection type.
Implementing an async keyed reducer is so niche that we've opted not to provide an example. If you have a use case, then please edit this page!
Composing
Probably the most powerful aspect of reducers is that they can be composed together to create more complex ones using higher-order reducers.
toGrouped
toGrouped
accepts an "inner" reducer and an "outer" keyed reducer to create a
reducer that:
- Accepts an iterable of key-value pairs
- Groups values by key using the outer reducer
- Reduces each group using the inner reducer
Plus, toGrouped
does not unnecessarily buffer each group's values. It streams
each group through the inner reducer!
For example, consider the following code:
- Sync
- Async
- Concur
import {
map,
pipe,
reduce,
toArray,
toGrouped,
toJoin,
toMap,
toMinBy,
} from 'lfi'
console.log(
pipe(
[`sloth`, `lazy`, `sleep`],
map(word => [word.length, word]),
reduce(toGrouped(toArray(), toMap())),
),
)
//=> Map(2) {
//=> 5 => [ 'sloth', 'sleep' ],
//=> 4 => [ 'lazy' ]
//=> }
console.log(
pipe(
[`sloth`, `lazy`, `sleep`],
map(word => [word.length, word]),
reduce(toGrouped(toJoin(` and `), toMap())),
),
)
//=> Map(2) {
//=> 5 => 'sloth and sleep',
//=> 4 => 'lazy'
//=> }
console.log(
pipe(
[`sloth`, `lazy`, `sleep`],
map(word => [word.length, word]),
reduce(
toGrouped(
toMinBy((word1, word2) => word1.localeCompare(word2)),
toMap(),
),
),
),
)
//=> Map(2) {
//=> 5 => 'sleep',
//=> 4 => 'lazy'
//=> }
import {
asAsync,
mapAsync,
pipe,
reduceAsync,
toArray,
toGrouped,
toJoin,
toMap,
toMinBy,
} from 'lfi'
const API_URL = `https://api.dictionaryapi.dev/api/v2/entries/en`
const getPartOfSpeech = async word => {
const response = await fetch(`${API_URL}/${word}`)
const [{ meanings }] = await response.json()
return meanings[0].partOfSpeech
}
console.log(
await pipe(
asAsync([`sloth`, `lazy`, `sleep`]),
mapAsync(async word => [await getPartOfSpeech(word), word]),
reduceAsync(toGrouped(toArray(), toMap())),
),
)
//=> Map(2) {
//=> 'noun' => [ 'sloth', 'lazy' ],
//=> 'verb' => [ 'sleep' ]
//=> }
console.log(
await pipe(
asAsync([`sloth`, `lazy`, `sleep`]),
mapAsync(async word => [await getPartOfSpeech(word), word]),
reduceAsync(toGrouped(toJoin(` and `), toMap())),
),
)
//=> Map(2) {
//=> 'noun' => 'sloth and lazy',
//=> 'verb' => 'sleep'
//=> }
console.log(
await pipe(
asAsync([`sloth`, `lazy`, `sleep`]),
mapAsync(async word => [await getPartOfSpeech(word), word]),
reduceAsync(
toGrouped(
toMinBy((word1, word2) => word1.localeCompare(word2)),
toMap(),
),
),
),
)
//=> Map(2) {
//=> 'noun' => 'lazy',
//=> 'verb' => 'sleep'
//=> }
import {
asConcur,
mapConcur,
pipe,
reduceConcur,
toArray,
toGrouped,
toJoin,
toMap,
toMinBy,
} from 'lfi'
const API_URL = `https://api.dictionaryapi.dev/api/v2/entries/en`
const getPartOfSpeech = async word => {
const response = await fetch(`${API_URL}/${word}`)
const [{ meanings }] = await response.json()
return meanings[0].partOfSpeech
}
console.log(
await pipe(
asConcur([`sloth`, `lazy`, `sleep`]),
mapConcur(async word => [await getPartOfSpeech(word), word]),
reduceConcur(toGrouped(toArray(), toMap())),
),
)
// NOTE: This order may change between runs
//=> Map(2) {
//=> 'noun' => [ 'sloth', 'lazy' ],
//=> 'verb' => [ 'sleep' ]
//=> }
console.log(
await pipe(
asConcur([`sloth`, `lazy`, `sleep`]),
mapConcur(async word => [await getPartOfSpeech(word), word]),
reduceConcur(toGrouped(toJoin(` and `), toMap())),
),
)
// NOTE: This order may change between runs
//=> Map(2) {
//=> 'noun' => 'sloth and lazy',
//=> 'verb' => 'sleep'
//=> }
console.log(
await pipe(
asConcur([`sloth`, `lazy`, `sleep`]),
mapConcur(async word => [await getPartOfSpeech(word), word]),
reduceConcur(
toGrouped(
toMinBy((word1, word2) => word1.localeCompare(word2)),
toMap(),
),
),
),
)
// NOTE: This order may change between runs
//=> Map(2) {
//=> 'noun' => 'lazy',
//=> 'verb' => 'sleep'
//=> }
toMultiple
toMultiple
accepts an array of reducers or an object with reducers for values
to create a reducer that reduces using all of the given reducers in parallel.
- Sync
- Async
- Concur
import { map, pipe, reduce, toCount, toJoin, toMultiple, toSet } from 'lfi'
console.log(
pipe(
[`sloth`, `lazy`, `sleep`],
map(word => word.length),
reduce(toMultiple([toSet(), toCount(), toJoin(`,`)])),
),
)
//=> [
//=> Set(2) { 5, 4 },
//=> 3,
//=> '5,4,5'
//=> ]
console.log(
pipe(
[`sloth`, `lazy`, `sleep`],
map(word => word.length),
reduce(
toMultiple({
set: toSet(),
count: toCount(),
string: toJoin(`,`),
}),
),
),
)
//=> {
//=> set: Set(2) { 5, 4 },
//=> count: 3,
//=> string: '5,4,5'
//=> }
import {
asAsync,
flatMapAsync,
pipe,
reduceAsync,
toCount,
toJoin,
toMultiple,
toSet,
} from 'lfi'
const API_URL = `https://api.dictionaryapi.dev/api/v2/entries/en`
const getPartsOfSpeech = async word => {
const response = await fetch(`${API_URL}/${word}`)
const [{ meanings }] = await response.json()
return meanings.map(meaning => meaning.partOfSpeech)
}
console.log(
await pipe(
asAsync([`sloth`, `lazy`, `sleep`]),
flatMapAsync(getPartsOfSpeech),
reduceAsync(toMultiple([toSet(), toCount(), toJoin(`,`)])),
),
)
//=> [
//=> Set(3) { 'noun', 'verb', 'adjective' },
//=> 6,
//=> 'noun,verb,noun,verb,adjective,verb'
//=> ]
console.log(
await pipe(
asAsync([`sloth`, `lazy`, `sleep`]),
flatMapAsync(getPartsOfSpeech),
reduceAsync(
toMultiple({
set: toSet(),
count: toCount(),
string: toJoin(`,`),
}),
),
),
)
//=> {
//=> set: Set(3) { 'noun', 'verb', 'adjective' },
//=> count: 6,
//=> string: 'noun,verb,noun,verb,adjective,verb'
//=> }
import {
asConcur,
mapConcur,
pipe,
reduceConcur,
toCount,
toJoin,
toMultiple,
toSet,
} from 'lfi'
const API_URL = `https://api.dictionaryapi.dev/api/v2/entries/en`
const getPartsOfSpeech = async word => {
const response = await fetch(`${API_URL}/${word}`)
const [{ meanings }] = await response.json()
return meanings.map(meaning => meaning.partOfSpeech)
}
console.log(
await pipe(
asConcur([`sloth`, `lazy`, `sleep`]),
mapConcur(getPartsOfSpeech),
reduceConcur(toMultiple([toSet(), toCount(), toJoin(`,`)])),
),
)
// NOTE: This order may change between runs
//=> [
//=> Set(3) { 'noun', 'verb', 'adjective' },
//=> 6,
//=> 'noun,verb,noun,verb,adjective,verb'
//=> ]
console.log(
await pipe(
asConcur([`sloth`, `lazy`, `sleep`]),
mapConcur(getPartsOfSpeech),
reduceConcur(
toMultiple({
set: toSet(),
count: toCount(),
string: toJoin(`,`),
}),
),
),
)
// NOTE: This order may change between runs
//=> {
//=> set: Set(3) { 'noun', 'verb', 'adjective' },
//=> count: 6,
//=> string: 'noun,verb,noun,verb,adjective,verb'
//=> }
mapReducer
and mapAsyncReducer
mapReducer
and mapAsyncReducer
are to reducers what map
, mapAsync
, and
mapConcur
are to iterables. mapReducer
and mapAsyncReducer
accept a
callback and a reducer to create a reducer identical to the input one, except
it's final value is transformed using the callback.
For example, consider the following code:
- Sync
- Async
import { mapReducer, pipe, reduce, toCount, toMultiple, toSum } from 'lfi'
const computeMean = numbers =>
pipe(
numbers,
reduce(
// This is exactly how `lfi`'s `toMean` function is implemented!
mapReducer(
([sum, count]) => (count === 0 ? NaN : sum / count),
toMultiple([toSum(), toCount()]),
),
),
)
console.log(computeMean([1, 2, 3, 4, 5]))
//=> 3
console.log(computeMean([]))
//=> NaN
import {
asAsync,
mapAsyncReducer,
pipe,
reduceAsync,
toCount,
toMultiple,
toSum,
} from 'lfi'
const delay = ms => new Promise(resolve => setTimeout(resolve, ms))
const computeMean = numbers =>
pipe(
asAsync(numbers),
reduceAsync(
mapAsyncReducer(
async ([sum, count]) => {
// Async work is allowed in async reducers
await delay(100)
return count === 0 ? NaN : sum / count
},
toMultiple([toSum(), toCount()]),
),
),
)
console.log(await computeMean([1, 2, 3, 4, 5]))
//=> 3
console.log(await computeMean([]))
//=> NaN
In what order are values combined?
When passing a sync reducer, reduce
, reduceAsync
, and reduceConcur
combine
values in iteration order. For example, consider the following code:
- Sync
- Async
// Assume this gets the next value in the iterable
const next = () => {
/* ... */
}
// The first value in the iterable becomes the accumulator
let acc = next()
// Each subsequent value in the iterable combines with the current accumulator
// to become the new accumulator
acc = add(acc, next())
acc = add(acc, next())
// ...
// Then the accumulator goes through a final transformation, if applicable
acc = finish(acc)
// Assume this gets the next value in the async iterable
const next = async () => {
/* ... */
}
// The first value in the async iterable becomes the accumulator
let acc = await next()
// Each subsequent value in the async iterable combines with the current
// accumulator to become the new accumulator
acc = add(acc, await next())
acc = add(acc, await next())
// ...
// Then the accumulator goes through a final transformation, if applicable
acc = finish(acc)
However, when passing an async reducer, reduceAsync
does not necessarily
combine in this same order, even though an async iterable has a clear iteration
order. Why is that?
Consider the following code:
// Assume this gets the next value in the async iterable
const next = async () => {
/* ... */
}
let acc = await next()
// What if awaiting this `add` call takes a long time? 🤔
acc = await add(acc, await next())
acc = await add(
acc,
// We're unnecessarily waiting to kick off this async work!
await next(),
)
// ...
acc = await finish(acc)
Each await next()
or await add(...)
may take a long time, and in the
meantime no other async work is being kicked off. It's inefficient to wait on
the first add(...)
promise to resolve before kicking off another next()
call
because that's not maximizing the async work happening concurrently.
Instead of alternating between awaiting next()
and add(...)
sequentially,
lfi
maximizes the async work happening concurrently by:
- Iterating through the async iterable as quickly as possible
- Kicking off an
add(...)
call that doesn't block the async iterable iteration whenever there's an accumulator and async iterable value available to combine - Kicking off a
combine(...)
call that doesn't block the async iterable iteration whenever there are two accumulators available to combine - Kicking off an
add(await create(), ...)
call that doesn't block the async iterable iteration whenever an async iterable value is available, but there is no accumulator to combine it with. This makes progress because an accumulator can be combined with an async iterable value or another accumulator, but two async iterable values cannot be combined
This continues until one accumulator remains, at which point it is transformed
with finish
and returned.
reduceConcur
works the same way, but the observed behavior is less surprising
because the iteration order of a concur iterable is already not deterministic.
How is it different from Array.prototype.reduce
's callback function?
A sync function reducer is almost identical to Array.prototype.reduce
's
callback function, but reducers in general are more powerful because they can:
- Define additional functions that enable encapsulation, composition, and improved performance in the async case
- Be async while
Array.prototype.reduce
's callback function cannot be without making the accumulator a promise
How is defining finish
different from transforming after reduce
, reduceAsync
, or reduceConcur
?
In some cases it's not! However, the finish
function is still useful for:
- Encapsulating reducer logic, which is especially useful when composing reducers
- Producing a final value whose type differs from the input types, in the case
of an
OptionalReducer
orAsyncOptionalReducer