Skip to content

Commit 190f349

Browse files
kwonojbenlesh
authored andcommitted
feat(forkJoin): accept promise, resultselector as parameter of forkJoin
closes #507
1 parent 46ca1d7 commit 190f349

File tree

6 files changed

+224
-34
lines changed

6 files changed

+224
-34
lines changed

spec/observables/forkJoin-spec.js

+151-10
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,155 @@
1-
/* globals describe, it, expect */
1+
/* globals describe, it, expect, lowerCaseO, hot, expectObservable */
22
var Rx = require('../../dist/cjs/Rx');
33
var Observable = Rx.Observable;
44

55
describe('Observable.forkJoin', function () {
6-
it('should join the last values of the provided observables into an array', function (done) {
7-
Observable.forkJoin(Observable.of(1, 2, 3, 'a'),
8-
Observable.of('b'),
9-
Observable.of(1, 2, 3, 4, 'c'))
10-
.subscribe(function (x) {
11-
expect(x).toEqual(['a', 'b', 'c']);
12-
}, null, done);
13-
});
14-
});
6+
it('should join the last values of the provided observables into an array', function () {
7+
var e1 = Observable.forkJoin(
8+
hot('--a--b--c--d--|'),
9+
hot('(b|)'),
10+
hot('--1--2--3--|')
11+
);
12+
var expected = '--------------(x|)';
13+
14+
expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']});
15+
});
16+
17+
it('should accept lowercase-o observables', function () {
18+
var e1 = Observable.forkJoin(
19+
hot('--a--b--c--d--|'),
20+
hot('(b|)'),
21+
lowerCaseO('1', '2', '3')
22+
);
23+
var expected = '--------------(x|)';
24+
25+
expectObservable(e1).toBe(expected, {x: ['d', 'b', '3']});
26+
});
27+
28+
it('should accept promise', function (done) {
29+
var e1 = Observable.forkJoin(
30+
Observable.of(1),
31+
Promise.resolve(2)
32+
);
33+
34+
e1.subscribe(function (x) {
35+
expect(x).toEqual([1,2]);
36+
},
37+
function (err) {
38+
done.fail('should not be called');
39+
},
40+
done);
41+
});
42+
43+
it('forkJoin n-ary parameters empty', function () {
44+
var e1 = Observable.forkJoin(
45+
hot('--a--b--c--d--|'),
46+
hot('(b|)'),
47+
hot('------------------|')
48+
);
49+
var expected = '------------------|';
50+
51+
expectObservable(e1).toBe(expected);
52+
});
53+
54+
it('forkJoin n-ary parameters empty before end', function () {
55+
var e1 = Observable.forkJoin(
56+
hot('--a--b--c--d--|'),
57+
hot('(b|)'),
58+
hot('---------|')
59+
);
60+
var expected = '---------|';
61+
62+
expectObservable(e1).toBe(expected);
63+
});
64+
65+
it('forkJoin empty empty', function () {
66+
var e1 = Observable.forkJoin(
67+
hot('--------------|'),
68+
hot('---------|')
69+
);
70+
var expected = '---------|';
71+
72+
expectObservable(e1).toBe(expected);
73+
});
74+
75+
it('forkJoin none', function () {
76+
var e1 = Observable.forkJoin();
77+
var expected = '|';
78+
79+
expectObservable(e1).toBe(expected);
80+
});
81+
82+
it('forkJoin empty return', function () {
83+
function selector(x, y) {
84+
return x + y;
85+
}
86+
87+
var e1 = Observable.forkJoin(
88+
hot('--a--b--c--d--|'),
89+
hot('---------|'),
90+
selector);
91+
var expected = '---------|';
92+
93+
expectObservable(e1).toBe(expected);
94+
});
95+
96+
it('forkJoin return return', function () {
97+
function selector(x, y) {
98+
return x + y;
99+
}
100+
101+
var e1 = Observable.forkJoin(
102+
hot('--a--b--c--d--|'),
103+
hot('---2-----|'),
104+
selector);
105+
var expected = '--------------(x|)';
106+
107+
expectObservable(e1).toBe(expected, {x: 'd2'});
108+
});
109+
110+
it('forkJoin empty throw', function () {
111+
var e1 = Observable.forkJoin(
112+
hot('------#'),
113+
hot('---------|'));
114+
var expected = '------#';
115+
116+
expectObservable(e1).toBe(expected);
117+
});
118+
119+
it('forkJoin empty throw', function () {
120+
function selector(x, y) {
121+
return x + y;
122+
}
123+
124+
var e1 = Observable.forkJoin(
125+
hot('------#'),
126+
hot('---------|'),
127+
selector);
128+
var expected = '------#';
129+
130+
expectObservable(e1).toBe(expected);
131+
});
132+
133+
it('forkJoin return throw', function () {
134+
var e1 = Observable.forkJoin(
135+
hot('------#'),
136+
hot('---a-----|'));
137+
var expected = '------#';
138+
139+
expectObservable(e1).toBe(expected);
140+
});
141+
142+
it('forkJoin return throw', function () {
143+
function selector(x, y) {
144+
return x + y;
145+
}
146+
147+
var e1 = Observable.forkJoin(
148+
hot('------#'),
149+
hot('-------b-|'),
150+
selector);
151+
var expected = '------#';
152+
153+
expectObservable(e1).toBe(expected);
154+
});
155+
});

src/Observable.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -138,7 +138,7 @@ export class Observable<T> implements CoreOperators<T> {
138138
static concat: <T>(...observables: Array<Observable<any> | Scheduler>) => Observable<T>;
139139
static defer: <T>(observableFactory: () => Observable<T>) => Observable<T>;
140140
static empty: <T>(scheduler?: Scheduler) => Observable<T>;
141-
static forkJoin: <T>(...observables: Observable<any>[]) => Observable<T>;
141+
static forkJoin: (...sources: Array<Observable<any> | Promise<any> | ((...values: Array<any>) => any)>) => Observable<any>;
142142
static from: <T>(iterable: any, scheduler?: Scheduler) => Observable<T>;
143143
static fromArray: <T>(array: T[], scheduler?: Scheduler) => Observable<T>;
144144
static fromEvent: <T>(element: any, eventName: string, selector?: (...args: Array<any>) => T) => Observable<T>;

src/observables/ForkJoinObservable.ts

+65-19
Original file line numberDiff line numberDiff line change
@@ -1,50 +1,96 @@
11
import {Observable} from '../Observable';
22
import {Subscriber} from '../Subscriber';
3+
import {PromiseObservable} from './PromiseObservable';
4+
import {EmptyObservable} from './EmptyObservable';
5+
import {isPromise} from '../util/isPromise';
36

47
export class ForkJoinObservable<T> extends Observable<T> {
5-
constructor(private observables: Observable<any>[]) {
6-
super();
8+
constructor(private sources: Array<Observable<any> |
9+
Promise<any> |
10+
((...values: Array<any>) => any)>) {
11+
super();
12+
}
13+
14+
static create(...sources: Array<Observable<any> |
15+
Promise<any> |
16+
((...values: Array<any>) => any)>)
17+
: Observable<any> {
18+
if (sources === null || sources.length === 0) {
19+
return new EmptyObservable();
20+
}
21+
return new ForkJoinObservable(sources);
722
}
823

9-
static create<R>(...observables: Observable<any>[]): Observable<R> {
10-
return new ForkJoinObservable(observables);
24+
private getResultSelector(): (...values: Array<any>) => any {
25+
const sources = this.sources;
26+
27+
let resultSelector = sources[sources.length - 1];
28+
if (typeof resultSelector !== 'function') {
29+
return null;
30+
}
31+
this.sources.pop();
32+
return <(...values: Array<any>) => any>resultSelector;
1133
}
1234

1335
_subscribe(subscriber: Subscriber<any>) {
14-
const observables = this.observables;
15-
const len = observables.length;
16-
let context = { complete: 0, total: len, values: emptyArray(len) };
36+
let resultSelector = this.getResultSelector();
37+
const sources = this.sources;
38+
const len = sources.length;
39+
40+
const context = { completed: 0, total: len, values: emptyArray(len), selector: resultSelector };
1741
for (let i = 0; i < len; i++) {
18-
observables[i].subscribe(new AllSubscriber(subscriber, this, i, context));
42+
let source = sources[i];
43+
if (isPromise(source)) {
44+
source = new PromiseObservable(<Promise<any>>source);
45+
}
46+
(<Observable<any>>source).subscribe(new AllSubscriber(subscriber, i, context));
1947
}
2048
}
2149
}
2250

2351
class AllSubscriber<T> extends Subscriber<T> {
24-
private _value: T;
52+
private _value: any = null;
2553

26-
constructor(destination: Subscriber<T>,
27-
private parent: ForkJoinObservable<T>,
54+
constructor(destination: Subscriber<any>,
2855
private index: number,
29-
private context: { complete: number, total: number, values: any[] }) {
56+
private context: { completed: number,
57+
total: number,
58+
values: any[],
59+
selector: (...values: Array<any>) => any }) {
3060
super(destination);
3161
}
3262

33-
_next(value: T) {
63+
_next(value: any): void {
3464
this._value = value;
3565
}
3666

37-
_complete() {
67+
_complete(): void {
68+
const destination = this.destination;
69+
70+
if (this._value == null) {
71+
destination.complete();
72+
}
73+
3874
const context = this.context;
75+
context.completed++;
3976
context.values[this.index] = this._value;
40-
if (context.values.every(hasValue)) {
41-
this.destination.next(context.values);
42-
this.destination.complete();
77+
const values = context.values;
78+
79+
if (context.completed !== values.length) {
80+
return;
81+
}
82+
83+
if (values.every(hasValue)) {
84+
let value = context.selector ? context.selector.apply(this, values) :
85+
values;
86+
destination.next(value);
4387
}
88+
89+
destination.complete();
4490
}
4591
}
4692

47-
function hasValue(x) {
93+
function hasValue(x: any): boolean {
4894
return x !== null;
4995
}
5096

@@ -54,4 +100,4 @@ function emptyArray(len: number): any[] {
54100
arr.push(null);
55101
}
56102
return arr;
57-
}
103+
}

src/observables/PromiseObservable.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export class PromiseObservable<T> extends Observable<T> {
1313
return new PromiseObservable(promise, scheduler);
1414
}
1515

16-
constructor(private promise: Promise<T>, public scheduler: Scheduler) {
16+
constructor(private promise: Promise<T>, public scheduler: Scheduler = immediate) {
1717
super();
1818
}
1919

src/operators/debounce.ts

+3-3
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import {Subscriber} from '../Subscriber';
55
import {Subscription} from '../Subscription';
66

77
import {tryCatch} from '../util/tryCatch';
8+
import {isPromise} from '../util/isPromise';
89
import {errorObject} from '../util/errorObject';
910

1011
export function debounce<T>(durationSelector: (value: T) => Observable<any> | Promise<any>): Observable<T> {
@@ -41,8 +42,7 @@ class DebounceSubscriber<T> extends Subscriber<T> {
4142
if (debounce === errorObject) {
4243
destination.error(errorObject.e);
4344
} else {
44-
if (typeof debounce.subscribe !== 'function'
45-
&& typeof debounce.then === 'function') {
45+
if (isPromise(debounce)) {
4646
debounce = PromiseObservable.create(debounce);
4747
}
4848

@@ -102,4 +102,4 @@ class DurationSelectorSubscriber<T> extends Subscriber<T> {
102102
_complete() {
103103
this.debounceNext();
104104
}
105-
}
105+
}

src/util/isPromise.ts

+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
export function isPromise(value: any): boolean {
2+
return value && typeof value.subscribe !== 'function' && typeof value.then === 'function';
3+
}

0 commit comments

Comments
 (0)