@@ -36,6 +36,7 @@ function from(Readable, iterable, opts) {
36
36
throw new ERR_INVALID_ARG_TYPE ( 'iterable' , [ 'Iterable' ] , iterable ) ;
37
37
}
38
38
39
+
39
40
const readable = new Readable ( {
40
41
objectMode : true ,
41
42
highWaterMark : 1 ,
@@ -46,11 +47,19 @@ function from(Readable, iterable, opts) {
46
47
// Flag to protect against _read
47
48
// being called before last iteration completion.
48
49
let reading = false ;
50
+ let isAsyncValues = false ;
49
51
50
52
readable . _read = function ( ) {
51
53
if ( ! reading ) {
52
54
reading = true ;
53
- next ( ) ;
55
+
56
+ if ( isAsync ) {
57
+ nextAsync ( ) ;
58
+ } else if ( isAsyncValues ) {
59
+ nextSyncWithAsyncValues ( ) ;
60
+ } else {
61
+ nextSyncWithSyncValues ( ) ;
62
+ }
54
63
}
55
64
} ;
56
65
@@ -78,29 +87,115 @@ function from(Readable, iterable, opts) {
78
87
}
79
88
}
80
89
81
- async function next ( ) {
90
+ // There are a lot of duplication here, it's done on purpose for performance
91
+ // reasons - avoid await when not needed.
92
+
93
+ function nextSyncWithSyncValues ( ) {
94
+ for ( ; ; ) {
95
+ try {
96
+ const { value, done } = iterator . next ( ) ;
97
+
98
+ if ( done ) {
99
+ readable . push ( null ) ;
100
+ return ;
101
+ }
102
+
103
+ if ( value &&
104
+ typeof value . then === 'function' ) {
105
+ return changeToAsyncValues ( value ) ;
106
+ }
107
+
108
+ if ( value === null ) {
109
+ reading = false ;
110
+ throw new ERR_STREAM_NULL_VALUES ( ) ;
111
+ }
112
+
113
+ if ( readable . push ( value ) ) {
114
+ continue ;
115
+ }
116
+
117
+ reading = false ;
118
+ } catch ( err ) {
119
+ readable . destroy ( err ) ;
120
+ }
121
+ break ;
122
+ }
123
+ }
124
+
125
+ async function changeToAsyncValues ( value ) {
126
+ isAsyncValues = true ;
127
+
128
+ try {
129
+ const res = await value ;
130
+
131
+ if ( res === null ) {
132
+ reading = false ;
133
+ throw new ERR_STREAM_NULL_VALUES ( ) ;
134
+ }
135
+
136
+ if ( readable . push ( res ) ) {
137
+ nextSyncWithAsyncValues ( ) ;
138
+ return ;
139
+ }
140
+
141
+ reading = false ;
142
+ } catch ( err ) {
143
+ readable . destroy ( err ) ;
144
+ }
145
+ }
146
+
147
+ async function nextSyncWithAsyncValues ( ) {
82
148
for ( ; ; ) {
83
149
try {
84
- const { value, done } = isAsync ?
85
- await iterator . next ( ) :
86
- iterator . next ( ) ;
150
+ const { value, done } = iterator . next ( ) ;
87
151
88
152
if ( done ) {
89
153
readable . push ( null ) ;
90
- } else {
91
- const res = ( value &&
92
- typeof value . then === 'function' ) ?
93
- await value :
94
- value ;
95
- if ( res === null ) {
96
- reading = false ;
97
- throw new ERR_STREAM_NULL_VALUES ( ) ;
98
- } else if ( readable . push ( res ) ) {
99
- continue ;
100
- } else {
101
- reading = false ;
102
- }
154
+ return ;
155
+ }
156
+
157
+ const res = ( value &&
158
+ typeof value . then === 'function' ) ?
159
+ await value :
160
+ value ;
161
+
162
+ if ( res === null ) {
163
+ reading = false ;
164
+ throw new ERR_STREAM_NULL_VALUES ( ) ;
103
165
}
166
+
167
+ if ( readable . push ( res ) ) {
168
+ continue ;
169
+ }
170
+
171
+ reading = false ;
172
+ } catch ( err ) {
173
+ readable . destroy ( err ) ;
174
+ }
175
+ break ;
176
+ }
177
+ }
178
+
179
+ async function nextAsync ( ) {
180
+ for ( ; ; ) {
181
+ try {
182
+ const { value, done } = await iterator . next ( ) ;
183
+
184
+ if ( done ) {
185
+ readable . push ( null ) ;
186
+ return ;
187
+ }
188
+
189
+ if ( value === null ) {
190
+ reading = false ;
191
+ throw new ERR_STREAM_NULL_VALUES ( ) ;
192
+ }
193
+
194
+ if ( readable . push ( value ) ) {
195
+ continue ;
196
+ }
197
+
198
+ reading = false ;
104
199
} catch ( err ) {
105
200
readable . destroy ( err ) ;
106
201
}
0 commit comments