Skip to content

Commit 5c59528

Browse files
authored
Add pMapIterable export (#63)
1 parent 136b08a commit 5c59528

8 files changed

+324
-11
lines changed

assert-in-range.js

+10
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
import chalk from 'chalk';
2+
import inRange from 'in-range';
3+
4+
export default function assertInRange(t, value, {start = 0, end}) {
5+
if (inRange(value, {start, end})) {
6+
t.pass();
7+
} else {
8+
t.fail(`${start} ${start <= value ? '≤' : chalk.red('≰')} ${chalk.yellow(value)} ${value <= end ? '≤' : chalk.red('≰')} ${end}`);
9+
}
10+
}

index.d.ts

+35-1
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
export type Options = {
1+
type BaseOptions = {
22
/**
33
Number of concurrently pending promises returned by `mapper`.
44
@@ -7,7 +7,9 @@ export type Options = {
77
@default Infinity
88
*/
99
readonly concurrency?: number;
10+
};
1011

12+
export type Options = BaseOptions & {
1113
/**
1214
When `true`, the first mapper rejection will be rejected back to the consumer.
1315
@@ -42,6 +44,17 @@ export type Options = {
4244
readonly signal?: AbortSignal;
4345
};
4446

47+
export type IterableOptions = BaseOptions & {
48+
/**
49+
Maximum number of promises returned by `mapper` that have resolved but not yet collected by the consumer of the async iterable. Calls to `mapper` will be limited so that there is never too much backpressure.
50+
51+
Useful whenever you are consuming the iterable slower than what the mapper function can produce concurrently. For example, to avoid making an overwhelming number of HTTP requests if you are saving each of the results to a database.
52+
53+
Default: `options.concurrency`
54+
*/
55+
readonly backpressure?: number;
56+
};
57+
4558
type MaybePromise<T> = T | Promise<T>;
4659

4760
/**
@@ -88,6 +101,27 @@ export default function pMap<Element, NewElement>(
88101
options?: Options
89102
): Promise<Array<Exclude<NewElement, typeof pMapSkip>>>;
90103

104+
/**
105+
@param input - Synchronous or asynchronous iterable that is iterated over concurrently, calling the `mapper` function for each element. Each iterated item is `await`'d before the `mapper` is invoked so the iterable may return a `Promise` that resolves to an item. Asynchronous iterables (different from synchronous iterables that return `Promise` that resolves to an item) can be used when the next item may not be ready without waiting for an asynchronous process to complete and/or the end of the iterable may be reached after the asynchronous process completes. For example, reading from a remote queue when the queue has reached empty, or reading lines from a stream.
106+
@param mapper - Function which is called for every item in `input`. Expected to return a `Promise` or value.
107+
@returns An async iterable that streams each return value from `mapper` in order.
108+
109+
@example
110+
```
111+
import {pMapIterable} from 'p-map';
112+
113+
// Multiple posts are fetched concurrently, with limited concurrency and backpressure
114+
for await (const post of pMapIterable(postIds, getPostMetadata, {concurrency: 8})) {
115+
console.log(post);
116+
};
117+
```
118+
*/
119+
export function pMapIterable<Element, NewElement>(
120+
input: AsyncIterable<Element | Promise<Element>> | Iterable<Element | Promise<Element>>,
121+
mapper: Mapper<Element, NewElement>,
122+
options?: IterableOptions
123+
): AsyncIterable<Exclude<NewElement, typeof pMapSkip>>;
124+
91125
/**
92126
Return this value from a `mapper` function to skip including the value in the returned array.
93127

index.js

+104-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ export default async function pMap(
1616
throw new TypeError('Mapper function is required');
1717
}
1818

19-
if (!((Number.isSafeInteger(concurrency) || concurrency === Number.POSITIVE_INFINITY) && concurrency >= 1)) {
19+
if (!((Number.isSafeInteger(concurrency) && concurrency >= 1) || concurrency === Number.POSITIVE_INFINITY)) {
2020
throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`);
2121
}
2222

@@ -162,4 +162,107 @@ export default async function pMap(
162162
});
163163
}
164164

165+
export function pMapIterable(
166+
iterable,
167+
mapper,
168+
{
169+
concurrency = Number.POSITIVE_INFINITY,
170+
backpressure = concurrency,
171+
} = {},
172+
) {
173+
if (iterable[Symbol.iterator] === undefined && iterable[Symbol.asyncIterator] === undefined) {
174+
throw new TypeError(`Expected \`input\` to be either an \`Iterable\` or \`AsyncIterable\`, got (${typeof iterable})`);
175+
}
176+
177+
if (typeof mapper !== 'function') {
178+
throw new TypeError('Mapper function is required');
179+
}
180+
181+
if (!((Number.isSafeInteger(concurrency) && concurrency >= 1) || concurrency === Number.POSITIVE_INFINITY)) {
182+
throw new TypeError(`Expected \`concurrency\` to be an integer from 1 and up or \`Infinity\`, got \`${concurrency}\` (${typeof concurrency})`);
183+
}
184+
185+
if (!((Number.isSafeInteger(backpressure) && backpressure >= concurrency) || backpressure === Number.POSITIVE_INFINITY)) {
186+
throw new TypeError(`Expected \`backpressure\` to be an integer from \`concurrency\` (${concurrency}) and up or \`Infinity\`, got \`${backpressure}\` (${typeof backpressure})`);
187+
}
188+
189+
return {
190+
async * [Symbol.asyncIterator]() {
191+
const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator]();
192+
193+
const promises = [];
194+
let runningMappersCount = 0;
195+
let isDone = false;
196+
197+
function trySpawn() {
198+
if (isDone || !(runningMappersCount < concurrency && promises.length < backpressure)) {
199+
return;
200+
}
201+
202+
const promise = (async () => {
203+
const {done, value} = await iterator.next();
204+
205+
if (done) {
206+
return {done: true};
207+
}
208+
209+
runningMappersCount++;
210+
211+
// Spawn if still below concurrency and backpressure limit
212+
trySpawn();
213+
214+
try {
215+
const returnValue = await mapper(value);
216+
217+
runningMappersCount--;
218+
219+
if (returnValue === pMapSkip) {
220+
const index = promises.indexOf(promise);
221+
222+
if (index > 0) {
223+
promises.splice(index, 1);
224+
}
225+
}
226+
227+
// Spawn if still below backpressure limit and just dropped below concurrency limit
228+
trySpawn();
229+
230+
return {done: false, value: returnValue};
231+
} catch (error) {
232+
isDone = true;
233+
return {error};
234+
}
235+
})();
236+
237+
promises.push(promise);
238+
}
239+
240+
trySpawn();
241+
242+
while (promises.length > 0) {
243+
const {error, done, value} = await promises[0]; // eslint-disable-line no-await-in-loop
244+
245+
promises.shift();
246+
247+
if (error) {
248+
throw error;
249+
}
250+
251+
if (done) {
252+
return;
253+
}
254+
255+
// Spawn if just dropped below backpressure limit and below the concurrency limit
256+
trySpawn();
257+
258+
if (value === pMapSkip) {
259+
continue;
260+
}
261+
262+
yield value;
263+
}
264+
},
265+
};
266+
}
267+
165268
export const pMapSkip = Symbol('skip');

index.test-d.ts

+3-1
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import {expectType, expectAssignable} from 'tsd';
2-
import pMap, {type Options, type Mapper, pMapSkip} from './index.js';
2+
import pMap, {pMapIterable, type Options, type Mapper, pMapSkip} from './index.js';
33

44
const sites = [
55
'https://sindresorhus.com',
@@ -48,3 +48,5 @@ expectType<Promise<number[]>>(pMap(numbers, (number: number) => {
4848

4949
return pMapSkip;
5050
}));
51+
52+
expectType<AsyncIterable<string>>(pMapIterable(sites, asyncMapper));

package.json

+1
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
],
4343
"devDependencies": {
4444
"ava": "^5.2.0",
45+
"chalk": "^5.3.0",
4546
"delay": "^5.0.0",
4647
"in-range": "^3.0.0",
4748
"random-int": "^3.0.0",

readme.md

+29
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,19 @@ console.log(result);
4141

4242
Returns a `Promise` that is fulfilled when all promises in `input` and ones returned from `mapper` are fulfilled, or rejects if any of the promises reject. The fulfilled value is an `Array` of the fulfilled values returned from `mapper` in `input` order.
4343

44+
### pMapIterable(input, mapper, options?)
45+
46+
Returns an async iterable that streams each return value from `mapper` in order.
47+
48+
```js
49+
import {pMapIterable} from 'p-map';
50+
51+
// Multiple posts are fetched concurrently, with limited concurrency and backpressure
52+
for await (const post of pMapIterable(postIds, getPostMetadata, {concurrency: 8})) {
53+
console.log(post);
54+
};
55+
```
56+
4457
#### input
4558

4659
Type: `AsyncIterable<Promise<unknown> | unknown> | Iterable<Promise<unknown> | unknown>`
@@ -67,8 +80,22 @@ Minimum: `1`
6780

6881
Number of concurrently pending promises returned by `mapper`.
6982

83+
##### backpressure
84+
85+
**Only for `pMapInterable`**
86+
87+
Type: `number` *(Integer)*\
88+
Default: `options.concurrency`\
89+
Minimum: `options.concurrency`
90+
91+
Maximum number of promises returned by `mapper` that have resolved but not yet collected by the consumer of the async iterable. Calls to `mapper` will be limited so that there is never too much backpressure.
92+
93+
Useful whenever you are consuming the iterable slower than what the mapper function can produce concurrently. For example, to avoid making an overwhelming number of HTTP requests if you are saving each of the results to a database.
94+
7095
##### stopOnError
7196

97+
**Only for `pMap`**
98+
7299
Type: `boolean`\
73100
Default: `true`
74101

@@ -80,6 +107,8 @@ Caveat: When `true`, any already-started async mappers will continue to run unti
80107

81108
##### signal
82109

110+
**Only for `pMap`**
111+
83112
Type: [`AbortSignal`](https://developer.mozilla.org/en-US/docs/Web/API/AbortSignal)
84113

85114
You can abort the promises using [`AbortController`](https://developer.mozilla.org/en-US/docs/Web/API/AbortController).

test-multiple-pmapskips-performance.js

+2-2
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import test from 'ava';
2-
import inRange from 'in-range';
32
import timeSpan from 'time-span';
3+
import assertInRange from './assert-in-range.js';
44
import pMap, {pMapSkip} from './index.js';
55

66
function generateSkipPerformanceData(length) {
@@ -32,6 +32,6 @@ test('multiple pMapSkips - algorithmic complexity', async t => {
3232
// shorter test. This is not perfect... there is some fluctuation.
3333
// The idea here is to catch a regression that makes `pMapSkip` handling O(n^2)
3434
// on the number of `pMapSkip` items in the input.
35-
t.true(inRange(longerDuration, {start: 1.2 * smallerDuration, end: 15 * smallerDuration}));
35+
assertInRange(t, longerDuration, {start: 1.2 * smallerDuration, end: 15 * smallerDuration});
3636
}
3737
});

0 commit comments

Comments
 (0)