Skip to content

Commit b22e95d

Browse files
TimothyGuprog1dev
authored andcommitted
readline: add support for async iteration
Co-authored-by: Ivan Filenko <ivan.filenko@protonmail.com> Fixes: #18603 Refs: #18904 PR-URL: #23916 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Gus Caplan <me@gus.host>
1 parent 33b5242 commit b22e95d

6 files changed

+315
-3
lines changed

doc/api/readline.md

+68-2
Original file line numberDiff line numberDiff line change
@@ -309,6 +309,43 @@ rl.write(null, { ctrl: true, name: 'u' });
309309
The `rl.write()` method will write the data to the `readline` `Interface`'s
310310
`input` *as if it were provided by the user*.
311311

312+
### rl\[Symbol.asyncIterator\]()
313+
<!-- YAML
314+
added: REPLACEME
315+
-->
316+
317+
> Stability: 1 - Experimental
318+
319+
* Returns: {AsyncIterator}
320+
321+
Create an `AsyncIterator` object that iterates through each line in the input
322+
stream as a string. This method allows asynchronous iteration of
323+
`readline.Interface` objects through `for`-`await`-`of` loops.
324+
325+
Errors in the input stream are not forwarded.
326+
327+
If the loop is terminated with `break`, `throw`, or `return`,
328+
[`rl.close()`][] will be called. In other words, iterating over a
329+
`readline.Interface` will always consume the input stream fully.
330+
331+
A caveat with using this experimental API is that the performance is
332+
currently not on par with the traditional `'line'` event API, and thus it is
333+
not recommended for performance-sensitive applications. We expect this
334+
situation to improve in the future.
335+
336+
```js
337+
async function processLineByLine() {
338+
const rl = readline.createInterface({
339+
// ...
340+
});
341+
342+
for await (const line of rl) {
343+
// Each line in the readline input will be successively available here as
344+
// `line`.
345+
}
346+
}
347+
```
348+
312349
## readline.clearLine(stream, dir)
313350
<!-- YAML
314351
added: v0.7.7
@@ -517,12 +554,38 @@ rl.on('line', (line) => {
517554

518555
## Example: Read File Stream Line-by-Line
519556

520-
A common use case for `readline` is to consume input from a filesystem
521-
[Readable][] stream one line at a time:
557+
A common use case for `readline` is to consume an input file one line at a
558+
time. The easiest way to do so is leveraging the [`fs.ReadStream`][] API as
559+
well as a `for`-`await`-`of` loop:
522560

523561
```js
562+
const fs = require('fs');
524563
const readline = require('readline');
564+
565+
async function processLineByLine() {
566+
const fileStream = fs.createReadStream('input.txt');
567+
568+
const rl = readline.createInterface({
569+
input: fileStream,
570+
crlfDelay: Infinity
571+
});
572+
// Note: we use the crlfDelay option to recognize all instances of CR LF
573+
// ('\r\n') in input.txt as a single line break.
574+
575+
for await (const line of rl) {
576+
// Each line in input.txt will be successively available here as `line`.
577+
console.log(`Line from file: ${line}`);
578+
}
579+
}
580+
581+
processLineByLine();
582+
```
583+
584+
Alternatively, one could use the [`'line'`][] event:
585+
586+
```js
525587
const fs = require('fs');
588+
const readline = require('readline');
526589

527590
const rl = readline.createInterface({
528591
input: fs.createReadStream('sample.txt'),
@@ -536,8 +599,11 @@ rl.on('line', (line) => {
536599

537600
[`'SIGCONT'`]: readline.html#readline_event_sigcont
538601
[`'SIGTSTP'`]: readline.html#readline_event_sigtstp
602+
[`'line'`]: #readline_event_line
603+
[`fs.ReadStream`]: fs.html#fs_class_fs_readstream
539604
[`process.stdin`]: process.html#process_process_stdin
540605
[`process.stdout`]: process.html#process_process_stdout
606+
[`rl.close()`]: #readline_rl_close
541607
[Readable]: stream.html#stream_readable_streams
542608
[TTY]: tty.html
543609
[Writable]: stream.html#stream_writable_streams

lib/readline.js

+43
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ const {
3333
ERR_INVALID_OPT_VALUE
3434
} = require('internal/errors').codes;
3535
const { debug, inherits } = require('util');
36+
const { emitExperimentalWarning } = require('internal/util');
3637
const { Buffer } = require('buffer');
3738
const EventEmitter = require('events');
3839
const {
@@ -54,11 +55,16 @@ const {
5455
// Lazy load StringDecoder for startup performance.
5556
let StringDecoder;
5657

58+
// Lazy load Readable for startup performance.
59+
let Readable;
60+
5761
const kHistorySize = 30;
5862
const kMincrlfDelay = 100;
5963
// \r\n, \n, or \r followed by something other than \n
6064
const lineEnding = /\r?\n|\r(?!\n)/;
6165

66+
const kLineObjectStream = Symbol('line object stream');
67+
6268
const KEYPRESS_DECODER = Symbol('keypress-decoder');
6369
const ESCAPE_DECODER = Symbol('escape-decoder');
6470

@@ -190,6 +196,8 @@ function Interface(input, output, completer, terminal) {
190196
self._refreshLine();
191197
}
192198

199+
this[kLineObjectStream] = undefined;
200+
193201
if (!this.terminal) {
194202
function onSelfCloseWithoutTerminal() {
195203
input.removeListener('data', ondata);
@@ -1019,6 +1027,41 @@ Interface.prototype._ttyWrite = function(s, key) {
10191027
}
10201028
};
10211029

1030+
Interface.prototype[Symbol.asyncIterator] = function() {
1031+
emitExperimentalWarning('readline Interface [Symbol.asyncIterator]');
1032+
1033+
if (this[kLineObjectStream] === undefined) {
1034+
if (Readable === undefined) {
1035+
Readable = require('stream').Readable;
1036+
}
1037+
const readable = new Readable({
1038+
objectMode: true,
1039+
read: () => {
1040+
this.resume();
1041+
},
1042+
destroy: (err, cb) => {
1043+
this.off('line', lineListener);
1044+
this.off('close', closeListener);
1045+
this.close();
1046+
cb(err);
1047+
}
1048+
});
1049+
const lineListener = (input) => {
1050+
if (!readable.push(input)) {
1051+
this.pause();
1052+
}
1053+
};
1054+
const closeListener = () => {
1055+
readable.push(null);
1056+
};
1057+
this.on('line', lineListener);
1058+
this.on('close', closeListener);
1059+
this[kLineObjectStream] = readable;
1060+
}
1061+
1062+
return this[kLineObjectStream][Symbol.asyncIterator]();
1063+
};
1064+
10221065
/**
10231066
* accepts a readable Stream instance and makes it emit "keypress" events
10241067
*/
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const assert = require('assert');
5+
const { Readable } = require('stream');
6+
const readline = require('readline');
7+
8+
const CONTENT = 'content';
9+
const TOTAL_LINES = 18;
10+
11+
(async () => {
12+
const readable = new Readable({ read() {} });
13+
readable.push(`${CONTENT}\n`.repeat(TOTAL_LINES));
14+
15+
const rli = readline.createInterface({
16+
input: readable,
17+
crlfDelay: Infinity
18+
});
19+
20+
const it = rli[Symbol.asyncIterator]();
21+
const highWaterMark = it.stream.readableHighWaterMark;
22+
23+
// For this test to work, we have to queue up more than the number of
24+
// highWaterMark items in rli. Make sure that is the case.
25+
assert(TOTAL_LINES > highWaterMark);
26+
27+
let iterations = 0;
28+
let readableEnded = false;
29+
for await (const line of it) {
30+
assert.strictEqual(readableEnded, false);
31+
32+
assert.strictEqual(line, CONTENT);
33+
34+
const expectedPaused = TOTAL_LINES - iterations > highWaterMark;
35+
assert.strictEqual(readable.isPaused(), expectedPaused);
36+
37+
iterations += 1;
38+
39+
// We have to end the input stream asynchronously for back pressure to work.
40+
// Only end when we have reached the final line.
41+
if (iterations === TOTAL_LINES) {
42+
readable.push(null);
43+
readableEnded = true;
44+
}
45+
}
46+
47+
assert.strictEqual(iterations, TOTAL_LINES);
48+
})().then(common.mustCall());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const fs = require('fs');
5+
const { join } = require('path');
6+
const readline = require('readline');
7+
const assert = require('assert');
8+
9+
const tmpdir = require('../common/tmpdir');
10+
tmpdir.refresh();
11+
12+
const filename = join(tmpdir.path, 'test.txt');
13+
14+
const testContents = [
15+
'',
16+
'\n',
17+
'line 1',
18+
'line 1\nline 2 南越国是前203年至前111年存在于岭南地区的一个国家\nline 3\ntrailing',
19+
'line 1\nline 2\nline 3 ends with newline\n'
20+
];
21+
22+
async function testSimpleDestroy() {
23+
for (const fileContent of testContents) {
24+
fs.writeFileSync(filename, fileContent);
25+
26+
const readable = fs.createReadStream(filename);
27+
const rli = readline.createInterface({
28+
input: readable,
29+
crlfDelay: Infinity
30+
});
31+
32+
const iteratedLines = [];
33+
for await (const k of rli) {
34+
iteratedLines.push(k);
35+
break;
36+
}
37+
38+
const expectedLines = fileContent.split('\n');
39+
if (expectedLines[expectedLines.length - 1] === '') {
40+
expectedLines.pop();
41+
}
42+
expectedLines.splice(1);
43+
44+
assert.deepStrictEqual(iteratedLines, expectedLines);
45+
}
46+
}
47+
48+
async function testMutualDestroy() {
49+
for (const fileContent of testContents) {
50+
fs.writeFileSync(filename, fileContent);
51+
52+
const readable = fs.createReadStream(filename);
53+
const rli = readline.createInterface({
54+
input: readable,
55+
crlfDelay: Infinity
56+
});
57+
58+
const expectedLines = fileContent.split('\n');
59+
if (expectedLines[expectedLines.length - 1] === '') {
60+
expectedLines.pop();
61+
}
62+
expectedLines.splice(2);
63+
64+
const iteratedLines = [];
65+
for await (const k of rli) {
66+
iteratedLines.push(k);
67+
for await (const l of rli) {
68+
iteratedLines.push(l);
69+
break;
70+
}
71+
assert.deepStrictEqual(iteratedLines, expectedLines);
72+
}
73+
74+
assert.deepStrictEqual(iteratedLines, expectedLines);
75+
}
76+
}
77+
78+
testSimpleDestroy().then(testMutualDestroy).then(common.mustCall());
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
'use strict';
2+
3+
const common = require('../common');
4+
const fs = require('fs');
5+
const { join } = require('path');
6+
const readline = require('readline');
7+
const assert = require('assert');
8+
9+
const tmpdir = require('../common/tmpdir');
10+
tmpdir.refresh();
11+
12+
const filename = join(tmpdir.path, 'test.txt');
13+
14+
const testContents = [
15+
'',
16+
'\n',
17+
'line 1',
18+
'line 1\nline 2 南越国是前203年至前111年存在于岭南地区的一个国家\nline 3\ntrailing',
19+
'line 1\nline 2\nline 3 ends with newline\n'
20+
];
21+
22+
async function testSimple() {
23+
for (const fileContent of testContents) {
24+
fs.writeFileSync(filename, fileContent);
25+
26+
const readable = fs.createReadStream(filename);
27+
const rli = readline.createInterface({
28+
input: readable,
29+
crlfDelay: Infinity
30+
});
31+
32+
const iteratedLines = [];
33+
for await (const k of rli) {
34+
iteratedLines.push(k);
35+
}
36+
37+
const expectedLines = fileContent.split('\n');
38+
if (expectedLines[expectedLines.length - 1] === '') {
39+
expectedLines.pop();
40+
}
41+
assert.deepStrictEqual(iteratedLines, expectedLines);
42+
assert.strictEqual(iteratedLines.join(''), fileContent.replace(/\n/gm, ''));
43+
}
44+
}
45+
46+
async function testMutual() {
47+
for (const fileContent of testContents) {
48+
fs.writeFileSync(filename, fileContent);
49+
50+
const readable = fs.createReadStream(filename);
51+
const rli = readline.createInterface({
52+
input: readable,
53+
crlfDelay: Infinity
54+
});
55+
56+
const expectedLines = fileContent.split('\n');
57+
if (expectedLines[expectedLines.length - 1] === '') {
58+
expectedLines.pop();
59+
}
60+
const iteratedLines = [];
61+
let iterated = false;
62+
for await (const k of rli) {
63+
// This outer loop should only iterate once.
64+
assert.strictEqual(iterated, false);
65+
iterated = true;
66+
67+
iteratedLines.push(k);
68+
for await (const l of rli) {
69+
iteratedLines.push(l);
70+
}
71+
assert.deepStrictEqual(iteratedLines, expectedLines);
72+
}
73+
assert.deepStrictEqual(iteratedLines, expectedLines);
74+
}
75+
}
76+
77+
testSimple().then(testMutual).then(common.mustCall());

tools/doc/type-parser.js

+1-1
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ const customTypesMap = {
2626

2727
'this': `${jsDocPrefix}Reference/Operators/this`,
2828

29-
'AsyncIterator': 'https://github.com/tc39/proposal-async-iteration',
29+
'AsyncIterator': 'https://tc39.github.io/ecma262/#sec-asynciterator-interface',
3030

3131
'bigint': 'https://github.com/tc39/proposal-bigint',
3232

0 commit comments

Comments
 (0)