Skip to content

Commit d45f672

Browse files
kwonojbenlesh
authored andcommitted
feat(forkJoin): accept array of observable as parameter
relates to #594
1 parent 24086c0 commit d45f672

File tree

3 files changed

+126
-34
lines changed

3 files changed

+126
-34
lines changed

spec/observables/forkJoin-spec.js

+104-15
Original file line numberDiff line numberDiff line change
@@ -8,18 +8,80 @@ describe('Observable.forkJoin', function () {
88
hot('--a--b--c--d--|'),
99
hot('(b|)'),
1010
hot('--1--2--3--|')
11-
);
11+
);
1212
var expected = '--------------(x|)';
1313

1414
expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']});
1515
});
1616

17+
it('should join the last values of the provided observables with selector', function () {
18+
function selector(x, y, z) {
19+
return x + y + z;
20+
}
21+
22+
var e1 = Observable.forkJoin(
23+
hot('--a--b--c--d--|'),
24+
hot('(b|)'),
25+
hot('--1--2--3--|'),
26+
selector
27+
);
28+
var expected = '--------------(x|)';
29+
30+
expectObservable(e1).toBe(expected, {x: 'db3'});
31+
});
32+
33+
it('should accept single observable', function () {
34+
var e1 = Observable.forkJoin(
35+
hot('--a--b--c--d--|')
36+
);
37+
var expected = '--------------(x|)';
38+
39+
expectObservable(e1).toBe(expected, {x: ['d']});
40+
});
41+
42+
it('should accept array of observable contains single', function () {
43+
var e1 = Observable.forkJoin(
44+
[hot('--a--b--c--d--|')]
45+
);
46+
var expected = '--------------(x|)';
47+
48+
expectObservable(e1).toBe(expected, {x: ['d']});
49+
});
50+
51+
it('should accept single observable with selector', function () {
52+
function selector(x) {
53+
return x + x;
54+
}
55+
56+
var e1 = Observable.forkJoin(
57+
hot('--a--b--c--d--|'),
58+
selector
59+
);
60+
var expected = '--------------(x|)';
61+
62+
expectObservable(e1).toBe(expected, {x: 'dd'});
63+
});
64+
65+
it('should accept array of observable contains single with selector', function () {
66+
function selector(x) {
67+
return x + x;
68+
}
69+
70+
var e1 = Observable.forkJoin(
71+
[hot('--a--b--c--d--|')],
72+
selector
73+
);
74+
var expected = '--------------(x|)';
75+
76+
expectObservable(e1).toBe(expected, {x: 'dd'});
77+
});
78+
1779
it('should accept lowercase-o observables', function () {
1880
var e1 = Observable.forkJoin(
1981
hot('--a--b--c--d--|'),
2082
hot('(b|)'),
2183
lowerCaseO('1', '2', '3')
22-
);
84+
);
2385
var expected = '--------------(x|)';
2486

2587
expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']});
@@ -29,7 +91,7 @@ describe('Observable.forkJoin', function () {
2991
var e1 = Observable.forkJoin(
3092
Observable.of(1),
3193
Promise.resolve(2)
32-
);
94+
);
3395

3496
e1.subscribe(function (x) {
3597
expect(x).toEqual([1,2]);
@@ -40,18 +102,45 @@ describe('Observable.forkJoin', function () {
40102
done);
41103
});
42104

43-
it('forkJoin n-ary parameters empty', function () {
105+
it('should accept array of observables', function () {
106+
var e1 = Observable.forkJoin(
107+
[hot('--a--b--c--d--|'),
108+
hot('(b|)'),
109+
hot('--1--2--3--|')]
110+
);
111+
var expected = '--------------(x|)';
112+
113+
expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']});
114+
});
115+
116+
it('should accept array of observables with selector', function () {
117+
function selector(x, y, z) {
118+
return x + y + z;
119+
}
120+
121+
var e1 = Observable.forkJoin(
122+
[hot('--a--b--c--d--|'),
123+
hot('(b|)'),
124+
hot('--1--2--3--|')],
125+
selector
126+
);
127+
var expected = '--------------(x|)';
128+
129+
expectObservable(e1).toBe(expected, {x: 'db3'});
130+
});
131+
132+
it('should not emit if any of source observable is empty', function () {
44133
var e1 = Observable.forkJoin(
45134
hot('--a--b--c--d--|'),
46135
hot('(b|)'),
47136
hot('------------------|')
48-
);
137+
);
49138
var expected = '------------------|';
50139

51140
expectObservable(e1).toBe(expected);
52141
});
53142

54-
it('forkJoin n-ary parameters empty before end', function () {
143+
it('should complete early if any of source is empty and completes before than others', function () {
55144
var e1 = Observable.forkJoin(
56145
hot('--a--b--c--d--|'),
57146
hot('(b|)'),
@@ -62,7 +151,7 @@ describe('Observable.forkJoin', function () {
62151
expectObservable(e1).toBe(expected);
63152
});
64153

65-
it('forkJoin empty empty', function () {
154+
it('should complete when all sources are empty', function () {
66155
var e1 = Observable.forkJoin(
67156
hot('--------------|'),
68157
hot('---------|')
@@ -72,14 +161,14 @@ describe('Observable.forkJoin', function () {
72161
expectObservable(e1).toBe(expected);
73162
});
74163

75-
it('forkJoin none', function () {
164+
it('should complete if source is not provided', function () {
76165
var e1 = Observable.forkJoin();
77166
var expected = '|';
78167

79168
expectObservable(e1).toBe(expected);
80169
});
81170

82-
it('forkJoin empty return', function () {
171+
it('should complete when any of source is empty with selector', function () {
83172
function selector(x, y) {
84173
return x + y;
85174
}
@@ -93,7 +182,7 @@ describe('Observable.forkJoin', function () {
93182
expectObservable(e1).toBe(expected);
94183
});
95184

96-
it('forkJoin return return', function () {
185+
it('should emit results by resultselector', function () {
97186
function selector(x, y) {
98187
return x + y;
99188
}
@@ -107,7 +196,7 @@ describe('Observable.forkJoin', function () {
107196
expectObservable(e1).toBe(expected, {x: 'd2'});
108197
});
109198

110-
it('forkJoin empty throw', function () {
199+
it('should raise error when any of source raises error with empty observable', function () {
111200
var e1 = Observable.forkJoin(
112201
hot('------#'),
113202
hot('---------|'));
@@ -116,7 +205,7 @@ describe('Observable.forkJoin', function () {
116205
expectObservable(e1).toBe(expected);
117206
});
118207

119-
it('forkJoin empty throw', function () {
208+
it('should raise error when any of source raises error with selector with empty observable', function () {
120209
function selector(x, y) {
121210
return x + y;
122211
}
@@ -130,7 +219,7 @@ describe('Observable.forkJoin', function () {
130219
expectObservable(e1).toBe(expected);
131220
});
132221

133-
it('forkJoin return throw', function () {
222+
it('should raise error when source raises error', function () {
134223
var e1 = Observable.forkJoin(
135224
hot('------#'),
136225
hot('---a-----|'));
@@ -139,7 +228,7 @@ describe('Observable.forkJoin', function () {
139228
expectObservable(e1).toBe(expected);
140229
});
141230

142-
it('forkJoin return throw', function () {
231+
it('should raise error when source raises error with selector', function () {
143232
function selector(x, y) {
144233
return x + y;
145234
}
@@ -152,4 +241,4 @@ describe('Observable.forkJoin', function () {
152241

153242
expectObservable(e1).toBe(expected);
154243
});
155-
});
244+
});

src/Observable.ts

+4-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,10 @@ export class Observable<T> implements CoreOperators<T> {
138138
static concat: <T>(...observables: Array<Observable<any> | Scheduler>) => Observable<T>;
139139
static defer: <T>(observableFactory: () => Observable<T>) => Observable<T>;
140140
static empty: <T>(scheduler?: Scheduler) => Observable<T>;
141-
static forkJoin: (...sources: Array<Observable<any> | Promise<any> | ((...values: Array<any>) => any)>) => Observable<any>;
141+
static forkJoin: (...sources: Array<Observable<any> |
142+
Array<Observable<any>> |
143+
Promise<any> |
144+
((...values: Array<any>) => any)>) => Observable<any>;
142145
static from: <T>(iterable: any, scheduler?: Scheduler) => Observable<T>;
143146
static fromArray: <T>(array: T[], scheduler?: Scheduler) => Observable<T>;
144147
static fromEvent: <T>(element: any, eventName: string, selector?: (...args: Array<any>) => T) => Observable<T>;

src/observables/ForkJoinObservable.ts

+18-18
Original file line numberDiff line numberDiff line change
@@ -3,41 +3,41 @@ import {Subscriber} from '../Subscriber';
33
import {PromiseObservable} from './PromiseObservable';
44
import {EmptyObservable} from './EmptyObservable';
55
import {isPromise} from '../util/isPromise';
6+
import {isArray} from '../util/isArray';
67

78
export class ForkJoinObservable<T> extends Observable<T> {
8-
constructor(private sources: Array<Observable<any> |
9-
Promise<any> |
10-
((...values: Array<any>) => any)>) {
11-
super();
9+
constructor(private sources: Array<Observable<any> | Promise<any>>,
10+
private resultSelector?: (...values: Array<any>) => any) {
11+
super();
1212
}
1313

1414
static create(...sources: Array<Observable<any> |
15+
Array<Observable<any>> |
1516
Promise<any> |
16-
((...values: Array<any>) => any)>)
17-
: Observable<any> {
18-
if (sources === null || sources.length === 0) {
17+
((...values: Array<any>) => any)>): Observable<any> {
18+
if (sources === null || arguments.length === 0) {
1919
return new EmptyObservable();
2020
}
21-
return new ForkJoinObservable(sources);
22-
}
2321

24-
private getResultSelector(): (...values: Array<any>) => any {
25-
const sources = this.sources;
22+
let resultSelector: (...values: Array<any>) => any = null;
23+
if (typeof sources[sources.length - 1] === 'function') {
24+
resultSelector = <(...values: Array<any>) => any>sources.pop();
25+
}
2626

27-
let resultSelector = sources[sources.length - 1];
28-
if (typeof resultSelector !== 'function') {
29-
return null;
27+
// if the first and only other argument besides the resultSelector is an array
28+
// assume it's been called with `forkJoin([obs1, obs2, obs3], resultSelector)`
29+
if (sources.length === 1 && isArray(sources[0])) {
30+
sources = <Array<Observable<any>>>sources[0];
3031
}
31-
this.sources.pop();
32-
return <(...values: Array<any>) => any>resultSelector;
32+
33+
return new ForkJoinObservable(<Array<Observable<any> | Promise<any>>>sources, resultSelector);
3334
}
3435

3536
_subscribe(subscriber: Subscriber<any>) {
36-
let resultSelector = this.getResultSelector();
3737
const sources = this.sources;
3838
const len = sources.length;
3939

40-
const context = { completed: 0, total: len, values: emptyArray(len), selector: resultSelector };
40+
const context = { completed: 0, total: len, values: emptyArray(len), selector: this.resultSelector };
4141
for (let i = 0; i < len; i++) {
4242
let source = sources[i];
4343
if (isPromise(source)) {

0 commit comments

Comments
 (0)