Skip to content

Commit e327747

Browse files
trxcllntwesm
authored andcommitted
ARROW-1990: [JS] C++ Refactor, Add DataFrame
This PR moves the `Table` class out of the Vector hierarchy and adds optimized dataframe operations to it. Currently implements an optimized `scan()` method, `filter(predicate)`, `count()`, and `countBy(column_name)` (only works on dictionary-encoded columns). Some usage examples, based on the file generated by `js/test/data/tables/generate.py`: ``` js > let table = Table.from(...); > table.count() 1000000 > table.filter(col('lat').gteq(0)).count() 499718 > table.countBy('origin').toJSON() { Charlottesville: 166839, 'New York': 166251, 'San Francisco': 166642, Seattle: 166659, 'Terre Haute': 166756, 'Washington, DC': 166853 } > table.filter(col('lng').gteq(0)).countBy('origin').toJSON() { Charlottesville: 83109, 'New York': 83221, 'San Francisco': 83515, Seattle: 83362, 'Terre Haute': 83314, 'Washington, DC': 83479 } ``` There are performance tests for the dataframe operations, to run them you must first generate the test data by running `npm run create:perfdata`. The PR also includes @trxcllnt's refactor of the JS implementation to make it more closely resemble the C++ implementation. This refactor resolves multiple JIRAs: ARROW-1903, ARROW-1898, ARROW-1502, ARROW-1952 (partially), and ARROW-1985 Author: Paul Taylor <paul.e.taylor@me.com> Author: Brian Hulette <brian.hulette@ccri.com> Author: Brian Hulette <hulettbh@gmail.com> Closes apache#1482 from TheNeuralBit/table-scan-perf and squashes the following commits: 52f1e0e [Brian Hulette] <, > are not commutative, misc cleanup 04b1838 [Brian Hulette] even more table tests 16b9ccb [Brian Hulette] Merge pull request #4 from trxcllnt/js-cpp-refactor fe300df [Paul Taylor] fix closure es5/umd toString() iterator 3d5240a [Paul Taylor] fix more externs 10c48ad [Paul Taylor] Merge branch 'table-scan-perf' of github.com:ccri/arrow into js-cpp-refactor dbe7f81 [Brian Hulette] Add more Table unit tests 1910962 [Brian Hulette] Add optional bind callback to scan 5bdf17f [Brian Hulette] Fix perf 8cf2473 [Brian Hulette] Merge remote-tracking branch 'origin/master' into table-scan-perf 4a41b18 [Paul Taylor] add src/predicate to the list of exports we should save from uglify 5a91fab [Paul Taylor] add more view, predicate externs f6adfb3 [Brian Hulette] Create predicate namespace f7bb0ed [Paul Taylor] Merge branch 'table-scan-perf' of github.com:ccri/arrow into js-cpp-refactor e148ee4 [Paul Taylor] Merge branch 'extern-woes' into js-cpp-refactor 25cdc4a [Paul Taylor] add src/predicate to the list of exports we should save from uglify dc7c728 [Paul Taylor] add more view, predicate externs 25e6af7 [Brian Hulette] Create predicate namespace 579ab1f [Brian Hulette] Merge pull request #2 from trxcllnt/js-cpp-refactor f3cde1a [Paul Taylor] fix lint 9769773 [Paul Taylor] fix vector perf tests 016ba78 [Brian Hulette] Merge pull request #1 from trxcllnt/js-cpp-refactor 272d293 [Paul Taylor] Merge pull request #4 from ccri/empty-table 7bc7363 [Brian Hulette] Fix exception for empty Table 8ddce0a [Paul Taylor] check bounds in getChildAt(i) to avoid NPEs f1dead0 [Paul Taylor] compute chunked nested childData list correctly 18807c6 [Paul Taylor] rename ChunkData's fields so it's more clear they're not semantically similar to other similarly named fields 7e43b78 [Paul Taylor] add test:integration npm script a5f200f [Paul Taylor] Merge pull request #3 from ccri/table-from-struct c8cd286 [Brian Hulette] Add Table.fromStruct a00415e [Brian Hulette] Fix perf 54d4f5b [Paul Taylor] lazily allocate table and recordbatch columns, support NestedView's getChildAt(i) method in ChunkedView 40b3638 [Paul Taylor] run integration tests with local data for coverage stats fe31ee0 [Paul Taylor] slice the flat data values before returning an iterator of them e537789 [Paul Taylor] make it easier to run all integration tests from local data c0fd2f9 [Paul Taylor] use the dictionary of the last chunked vector list for chunked dictionary vectors e33c068 [Paul Taylor] Merge pull request #2 from ccri/fixed-size-list 5bb63af [Brian Hulette] Don't read OFFSET vector for FixedSizeList 614b688 [Paul Taylor] add asEpochMs to date and timestamp vectors 87334a5 [Paul Taylor] Merge branch 'table-scan-perf' of github.com:ccri/arrow into js-cpp-refactor b7f5bfb [Paul Taylor] rename numRows to length, add table.getColumn() e81082f [Paul Taylor] export vector views, allow cloning data as another type 700a47c [Paul Taylor] export visitors e859e13 [Paul Taylor] fix package.json bin entry 0620cfd [Brian Hulette] use Math.fround 0126dc4 [Brian Hulette] Don't recompute total length e761eee [Brian Hulette] Rename asJSON to toJSON 6c91ed4 [Paul Taylor] Merge branch 'master' of github.com:apache/arrow into js-cpp-refactor-merge_with-table-scan-perf d2b18d5 [Paul Taylor] Merge remote-tracking branch 'ccri/table-scan-perf' into js-cpp-refactor-merge_with-table-scan-perf f3f3b86 [Paul Taylor] rename table.ts to recordbatch.ts in preparation for merging latest e3f629d [Paul Taylor] fix rest of the mangling issues fa7c17a [Paul Taylor] passing all tests except es5 umd mangler ones e20decd [Brian Hulette] Add license headers edcbdbe [Brian Hulette] cleanup 20717d5 [Brian Hulette] Fixed countBy(string) 7244887 [Brian Hulette] Add table unit tests... 6719147 [Brian Hulette] Add DataFrame.countBy operation 2f4a349 [Brian Hulette] Minor tweaks 2e118ab [Brian Hulette] linter a788db3 [Brian Hulette] Cleanup a9fff89 [Brian Hulette] Move Table out of the Vector hierarchy 1d60aa1 [Brian Hulette] Moved DataFrame ops to Table. DataFrame is now an interface e8979ba [Brian Hulette] Refactor DataFrame to extend Vector<StructRow> 6a41d68 [Brian Hulette] clean up table benchmarks 2744c63 [Brian Hulette] Remove Chunked/Simple DataFrame distinction aa999f8 [Brian Hulette] Add DictionaryVector optimization for equals predicate 4d9e8c0 [Brian Hulette] Add concept of predicates for filtering dataframes 796f45d [Brian Hulette] add DataFrame filter and count ops 30f0330 [Brian Hulette] Add basic DataFrame impl ... a1edac2 [Brian Hulette] Add perf tests for table scans d18d915 [Paul Taylor] fix struct and map rows 61dc699 [Paul Taylor] WIP -- refactor types to closer match arrow-cpp 62db338 [Paul Taylor] update dependencies and add es6+ umd targets to jest transform ignore patterns to fix ci 6ff18e9 [Paul Taylor] ship es2015 commonJS in main package to avoid confusion 74e828a [Paul Taylor] fix typings issues (ARROW-1903)
1 parent f84af8f commit e327747

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

67 files changed

+6025
-3065
lines changed

js/bin/integration.js

+50-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
// specific language governing permissions and limitations
1818
// under the License.
1919

20+
var fs = require('fs');
21+
var glob = require('glob');
2022
var path = require('path');
2123
var gulp = require.resolve(path.join(`..`, `node_modules/gulp/bin/gulp.js`));
2224
var child_process = require(`child_process`);
@@ -29,12 +31,14 @@ var optionList = [
2931
{
3032
type: String,
3133
name: 'arrow', alias: 'a',
32-
description: 'The Arrow file to read/write'
34+
multiple: true, defaultValue: [],
35+
description: 'The Arrow file[s] to read/write'
3336
},
3437
{
3538
type: String,
3639
name: 'json', alias: 'j',
37-
description: 'The JSON file to read/write'
40+
multiple: true, defaultValue: [],
41+
description: 'The JSON file[s] to read/write'
3842
}
3943
];
4044

@@ -66,20 +70,60 @@ function print_usage() {
6670
process.exit(1);
6771
}
6872

69-
if (!argv.arrow || !argv.json || !argv.mode) {
73+
let jsonPaths = argv.json;
74+
let arrowPaths = argv.arrow;
75+
76+
if (!argv.mode) {
77+
return print_usage();
78+
}
79+
80+
let mode = argv.mode.toUpperCase();
81+
if (mode === 'VALIDATE' && !jsonPaths.length) {
82+
jsonPaths = glob.sync(path.resolve(__dirname, `../test/data/json/`, `*.json`));
83+
if (!arrowPaths.length) {
84+
[jsonPaths, arrowPaths] = jsonPaths.reduce(([jsonPaths, arrowPaths], jsonPath) => {
85+
const { name } = path.parse(jsonPath);
86+
for (const source of ['cpp', 'java']) {
87+
for (const format of ['file', 'stream']) {
88+
const arrowPath = path.resolve(__dirname, `../test/data/${source}/${format}/${name}.arrow`);
89+
if (fs.existsSync(arrowPath)) {
90+
jsonPaths.push(jsonPath);
91+
arrowPaths.push(arrowPath);
92+
console.log('-j', jsonPath, '-a', arrowPath, '\\');
93+
}
94+
}
95+
}
96+
return [jsonPaths, arrowPaths];
97+
}, [[], []]);
98+
}
99+
} else if (!jsonPaths.length) {
70100
return print_usage();
71101
}
72102

73-
switch (argv.mode.toUpperCase()) {
103+
switch (mode) {
74104
case 'VALIDATE':
105+
const args = [`test`, `-i`].concat(argv._unknown || []);
106+
jsonPaths.forEach((p, i) => {
107+
args.push('-j', p, '-a', arrowPaths[i]);
108+
});
75109
child_process.spawnSync(
76-
gulp,
77-
[`test`, `-i`].concat(process.argv.slice(2)),
110+
gulp, args,
78111
{
79112
cwd: path.resolve(__dirname, '..'),
80113
stdio: ['ignore', 'inherit', 'inherit']
81114
}
82115
);
116+
// for (let i = -1, n = jsonPaths.length; ++i < n;) {
117+
// const jsonPath = jsonPaths[i];
118+
// const arrowPath = arrowPaths[i];
119+
// child_process.spawnSync(
120+
// gulp, args.concat(['-j', jsonPath, '-a', arrowPath]),
121+
// {
122+
// cwd: path.resolve(__dirname, '..'),
123+
// stdio: ['ignore', 'inherit', 'inherit']
124+
// }
125+
// );
126+
// }
83127
break;
84128
default:
85129
print_usage();

js/gulp/argv.js

+27-4
Original file line numberDiff line numberDiff line change
@@ -15,20 +15,22 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
const fs = require('fs');
19+
const glob = require('glob');
20+
const path = require('path');
21+
1822
const argv = require(`command-line-args`)([
1923
{ name: `all`, type: Boolean },
2024
{ name: 'update', alias: 'u', type: Boolean },
2125
{ name: 'verbose', alias: 'v', type: Boolean },
2226
{ name: `target`, type: String, defaultValue: `` },
2327
{ name: `module`, type: String, defaultValue: `` },
2428
{ name: `coverage`, type: Boolean, defaultValue: false },
25-
{ name: `json_file`, alias: `j`, type: String, defaultValue: null },
26-
{ name: `arrow_file`, alias: `a`, type: String, defaultValue: null },
2729
{ name: `integration`, alias: `i`, type: Boolean, defaultValue: false },
2830
{ name: `targets`, alias: `t`, type: String, multiple: true, defaultValue: [] },
2931
{ name: `modules`, alias: `m`, type: String, multiple: true, defaultValue: [] },
30-
{ name: `sources`, alias: `s`, type: String, multiple: true, defaultValue: [`cpp`, `java`] },
31-
{ name: `formats`, alias: `f`, type: String, multiple: true, defaultValue: [`file`, `stream`] },
32+
{ name: `json_files`, alias: `j`, type: String, multiple: true, defaultValue: [] },
33+
{ name: `arrow_files`, alias: `a`, type: String, multiple: true, defaultValue: [] },
3234
], { partial: true });
3335

3436
const { targets, modules } = argv;
@@ -38,4 +40,25 @@ argv.module && !modules.length && modules.push(argv.module);
3840
(argv.all || !targets.length) && targets.push(`all`);
3941
(argv.all || !modules.length) && modules.push(`all`);
4042

43+
if (argv.coverage && (!argv.json_files || !argv.json_files.length)) {
44+
45+
let [jsonPaths, arrowPaths] = glob
46+
.sync(path.resolve(__dirname, `../test/data/json/`, `*.json`))
47+
.reduce((paths, jsonPath) => {
48+
const { name } = path.parse(jsonPath);
49+
const [jsonPaths, arrowPaths] = paths;
50+
['cpp', 'java'].forEach((source) => ['file', 'stream'].forEach((format) => {
51+
const arrowPath = path.resolve(__dirname, `../test/data/${source}/${format}/${name}.arrow`);
52+
if (fs.existsSync(arrowPath)) {
53+
jsonPaths.push(jsonPath);
54+
arrowPaths.push(arrowPath);
55+
}
56+
}));
57+
return paths;
58+
}, [[], []]);
59+
60+
argv.json_files = jsonPaths;
61+
argv.arrow_files = arrowPaths;
62+
}
63+
4164
module.exports = { argv, targets, modules };

js/gulp/closure-task.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ const closureTask = ((cache) => memoizeTask(cache, function closure(target, form
3636
const src = targetDir(target, `cls`);
3737
const out = targetDir(target, format);
3838
const entry = path.join(src, mainExport);
39-
const externs = path.join(src, `${mainExport}.externs`);
39+
const externs = path.join(`src/Arrow.externs.js`);
4040
return observableFromStreams(
4141
gulp.src([
4242
/* external libs first --> */ `node_modules/tslib/package.json`,
@@ -46,7 +46,6 @@ const closureTask = ((cache) => memoizeTask(cache, function closure(target, form
4646
`node_modules/text-encoding-utf-8/package.json`,
4747
`node_modules/text-encoding-utf-8/src/encoding.js`,
4848
/* then sources globs --> */ `${src}/**/*.js`,
49-
/* and exclusions last --> */ `!${src}/Arrow.externs.js`,
5049
], { base: `./` }),
5150
sourcemaps.init(),
5251
closureCompiler(createClosureArgs(entry, externs)),
@@ -60,14 +59,15 @@ const closureTask = ((cache) => memoizeTask(cache, function closure(target, form
6059
}))({});
6160

6261
const createClosureArgs = (entry, externs) => ({
62+
externs,
6363
third_party: true,
6464
warning_level: `QUIET`,
6565
dependency_mode: `STRICT`,
6666
rewrite_polyfills: false,
67-
externs: `${externs}.js`,
6867
entry_point: `${entry}.js`,
6968
module_resolution: `NODE`,
70-
// formatting: `PRETTY_PRINT`, debug: true,
69+
// formatting: `PRETTY_PRINT`,
70+
// debug: true,
7171
compilation_level: `ADVANCED`,
7272
allow_method_call_decomposing: true,
7373
package_json_entry_names: `module,jsnext:main,main`,

js/gulp/package-task.js

+13-10
Original file line numberDiff line numberDiff line change
@@ -45,10 +45,11 @@ const createMainPackageJson = (target, format) => (orig) => ({
4545
...createTypeScriptPackageJson(target, format)(orig),
4646
name: npmPkgName,
4747
main: mainExport,
48+
types: `${mainExport}.d.ts`,
4849
module: `${mainExport}.mjs`,
4950
dist: `${mainExport}.es5.min.js`,
5051
[`dist:es2015`]: `${mainExport}.es2015.min.js`,
51-
[`@std/esm`]: { esm: `mjs` }
52+
[`@std/esm`]: { esm: `mjs`, warnings: false, sourceMap: true }
5253
});
5354

5455
const createTypeScriptPackageJson = (target, format) => (orig) => ({
@@ -63,18 +64,20 @@ const createTypeScriptPackageJson = (target, format) => (orig) => ({
6364

6465
const createScopedPackageJSON = (target, format) => (({ name, ...orig }) =>
6566
conditionallyAddStandardESMEntry(target, format)(
66-
packageJSONFields.reduce(
67-
(xs, key) => ({ ...xs, [key]: xs[key] || orig[key] }),
68-
{ name: `${npmOrgName}/${packageName(target, format)}`,
69-
version: undefined, main: `${mainExport}.js`, types: `${mainExport}.d.ts`,
70-
dist: undefined, [`dist:es2015`]: undefined, module: undefined, [`@std/esm`]: undefined }
71-
)
67+
packageJSONFields.reduce(
68+
(xs, key) => ({ ...xs, [key]: xs[key] || orig[key] }),
69+
{
70+
name: `${npmOrgName}/${packageName(target, format)}`,
71+
version: undefined, main: `${mainExport}.js`, types: `${mainExport}.d.ts`,
72+
dist: undefined, [`dist:es2015`]: undefined, module: undefined, [`@std/esm`]: undefined
73+
}
74+
)
7275
)
7376
);
7477

7578
const conditionallyAddStandardESMEntry = (target, format) => (packageJSON) => (
76-
format !== `esm`
77-
? packageJSON
78-
: { ...packageJSON, [`@std/esm`]: { esm: `js` } }
79+
format !== `esm` && format !== `cls`
80+
? packageJSON
81+
: { ...packageJSON, [`@std/esm`]: { esm: `js`, warnings: false, sourceMap: true } }
7982
);
8083

js/gulp/test-task.js

+5-5
Original file line numberDiff line numberDiff line change
@@ -44,15 +44,15 @@ const testOptions = {
4444
const testTask = ((cache, execArgv, testOptions) => memoizeTask(cache, function test(target, format, debug = false) {
4545
const opts = { ...testOptions };
4646
const args = !debug ? [...execArgv] : [...debugArgv, ...execArgv];
47-
args.push(`test/${argv.integration ? `integration/*` : `unit/*`}`);
47+
if (!argv.coverage) {
48+
args.push(`test/${argv.integration ? `integration/*` : `unit/*`}`);
49+
}
4850
opts.env = { ...opts.env,
4951
TEST_TARGET: target,
5052
TEST_MODULE: format,
51-
JSON_PATH: argv.json_file,
52-
ARROW_PATH: argv.arrow_file,
5353
TEST_TS_SOURCE: !!argv.coverage,
54-
TEST_SOURCES: JSON.stringify(Array.isArray(argv.sources) ? argv.sources : [argv.sources]),
55-
TEST_FORMATS: JSON.stringify(Array.isArray(argv.formats) ? argv.formats : [argv.formats]),
54+
JSON_PATHS: JSON.stringify(Array.isArray(argv.json_files) ? argv.json_files : [argv.json_files]),
55+
ARROW_PATHS: JSON.stringify(Array.isArray(argv.arrow_files) ? argv.arrow_files : [argv.arrow_files]),
5656
};
5757
return !debug ?
5858
child_process.spawn(jest, args, opts) :

js/gulp/typescript-task.js

+4-4
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ const typescriptTask = ((cache) => memoizeTask(cache, function typescript(target
3434
const tsProject = ts.createProject(path.join(`tsconfig`, tsconfigFile), { typescript: require(`typescript`) });
3535
const { stream: { js, dts } } = observableFromStreams(
3636
tsProject.src(), sourcemaps.init(),
37-
tsProject(ts.reporter.fullReporter(true))
37+
tsProject(ts.reporter.defaultReporter())
3838
);
3939
const writeDTypes = observableFromStreams(dts, gulp.dest(out));
4040
const writeJS = observableFromStreams(js, sourcemaps.write(), gulp.dest(out));
@@ -52,12 +52,12 @@ function maybeCopyRawJSArrowFormatFiles(target, format) {
5252
return Observable.empty();
5353
}
5454
return Observable.defer(async () => {
55-
const outFormatDir = path.join(targetDir(target, format), `format`, `fb`);
55+
const outFormatDir = path.join(targetDir(target, format), `fb`);
5656
await del(path.join(outFormatDir, '*.js'));
5757
await observableFromStreams(
58-
gulp.src(path.join(`src`, `format`, `fb`, `*_generated.js`)),
58+
gulp.src(path.join(`src`, `fb`, `*_generated.js`)),
5959
gulpRename((p) => { p.basename = p.basename.replace(`_generated`, ``); }),
6060
gulp.dest(outFormatDir)
6161
).toPromise();
6262
});
63-
}
63+
}

js/gulp/uglify-task.js

+16-7
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ const webpack = require(`webpack`);
2929
const { memoizeTask } = require('./memoize-task');
3030
const { Observable, ReplaySubject } = require('rxjs');
3131
const UglifyJSPlugin = require(`uglifyjs-webpack-plugin`);
32-
const esmRequire = require(`@std/esm`)(module, { cjs: true, esm: `js` });
32+
const esmRequire = require(`@std/esm`)(module, { cjs: true, esm: `js`, warnings: false });
3333

3434
const uglifyTask = ((cache, commonConfig) => memoizeTask(cache, function uglifyJS(target, format) {
3535

@@ -84,11 +84,20 @@ module.exports = uglifyTask;
8484
module.exports.uglifyTask = uglifyTask;
8585

8686
const reservePublicNames = ((ESKeywords) => function reservePublicNames(target, format) {
87-
const publicModulePath = `../${targetDir(target, format)}/${mainExport}.js`;
88-
return [
89-
...ESKeywords,
90-
...reserveExportedNames(esmRequire(publicModulePath))
87+
const src = targetDir(target, format);
88+
const publicModulePaths = [
89+
`../${src}/data.js`,
90+
`../${src}/type.js`,
91+
`../${src}/table.js`,
92+
`../${src}/vector.js`,
93+
`../${src}/util/int.js`,
94+
`../${src}/predicate.js`,
95+
`../${src}/recordbatch.js`,
96+
`../${src}/${mainExport}.js`,
9197
];
98+
return publicModulePaths.reduce((keywords, publicModulePath) => [
99+
...keywords, ...reserveExportedNames(esmRequire(publicModulePath, { warnings: false }))
100+
], [...ESKeywords]);
92101
})(ESKeywords);
93102

94103
// Reflect on the Arrow modules to come up with a list of keys to save from Uglify's
@@ -104,8 +113,8 @@ const reserveExportedNames = (entryModule) => (
104113
.map((name) => [name, entryModule[name]])
105114
.reduce((reserved, [name, value]) => {
106115
const fn = function() {};
107-
const ownKeys = value && Object.getOwnPropertyNames(value) || [];
108-
const protoKeys = typeof value === `function` && Object.getOwnPropertyNames(value.prototype) || [];
116+
const ownKeys = value && typeof value === 'object' && Object.getOwnPropertyNames(value) || [];
117+
const protoKeys = typeof value === `function` && Object.getOwnPropertyNames(value.prototype || {}) || [];
109118
const publicNames = [...ownKeys, ...protoKeys].filter((x) => x !== `default` && x !== `undefined` && !(x in fn));
110119
return [...reserved, name, ...publicNames];
111120
}, []

js/gulp/util.js

+3-4
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ const ESKeywords = [
8787
// EventTarget
8888
`addListener`, `removeListener`, `addEventListener`, `removeEventListener`,
8989
// Arrow properties
90-
`low`, `high`, `data`, `index`, `field`, `validity`, `columns`, `fieldNode`, `subarray`,
90+
`low`, `high`, `data`, `index`, `field`, `columns`, 'numCols', 'numRows', `values`, `valueOffsets`, `nullBitmap`, `subarray`
9191
];
9292

9393
function taskName(target, format) {
@@ -108,14 +108,13 @@ function targetDir(target, format) {
108108

109109
function logAndDie(e) {
110110
if (e) {
111-
console.error(e);
112111
process.exit(1);
113112
}
114113
}
115114

116115
function observableFromStreams(...streams) {
117-
const pumped = streams.length <= 1 ? streams[0]
118-
: pump(...streams, logAndDie);
116+
if (streams.length <= 0) { return Observable.empty(); }
117+
const pumped = streams.length <= 1 ? streams[0] : pump(...streams, logAndDie);
119118
const fromEvent = Observable.fromEvent.bind(null, pumped);
120119
const streamObs = fromEvent(`data`)
121120
.merge(fromEvent(`error`).flatMap((e) => Observable.throw(e)))

0 commit comments

Comments
 (0)