1
1
import { Observable , ObservableInput } from '../Observable' ;
2
2
import { IScheduler } from '../Scheduler' ;
3
- import { ArrayObservable } from '../observable/ArrayObservable' ;
4
- import { mergeAll } from './mergeAll' ;
5
- import { isScheduler } from '../util/isScheduler' ;
6
3
import { OperatorFunction , MonoTypeOperatorFunction } from '../interfaces' ;
4
+ import { merge as mergeStatic } from '../observable/merge' ;
5
+
6
+ export { merge as mergeStatic } from '../observable/merge' ;
7
7
8
8
/* tslint:disable:max-line-length */
9
9
export function merge < T > ( scheduler ?: IScheduler ) : MonoTypeOperatorFunction < T > ;
@@ -21,27 +21,6 @@ export function merge<T, T2, T3, T4, T5, T6>(v2: ObservableInput<T2>, v3: Observ
21
21
export function merge < T > ( ...observables : Array < ObservableInput < T > | IScheduler | number > ) : MonoTypeOperatorFunction < T > ;
22
22
export function merge < T , R > ( ...observables : Array < ObservableInput < any > | IScheduler | number > ) : OperatorFunction < T , R > ;
23
23
/* tslint:enable:max-line-length */
24
-
25
- export function merge < T , R > ( ...observables : Array < ObservableInput < any > | IScheduler | number > ) : OperatorFunction < T , R > {
26
- return ( source : Observable < T > ) => source . lift . call ( mergeStatic ( source , ...observables ) ) ;
27
- }
28
-
29
- /* tslint:disable:max-line-length */
30
- export function mergeStatic < T > ( v1 : ObservableInput < T > , scheduler ?: IScheduler ) : Observable < T > ;
31
- export function mergeStatic < T > ( v1 : ObservableInput < T > , concurrent ?: number , scheduler ?: IScheduler ) : Observable < T > ;
32
- export function mergeStatic < T , T2 > ( v1 : ObservableInput < T > , v2 : ObservableInput < T2 > , scheduler ?: IScheduler ) : Observable < T | T2 > ;
33
- export function mergeStatic < T , T2 > ( v1 : ObservableInput < T > , v2 : ObservableInput < T2 > , concurrent ?: number , scheduler ?: IScheduler ) : Observable < T | T2 > ;
34
- export function mergeStatic < T , T2 , T3 > ( v1 : ObservableInput < T > , v2 : ObservableInput < T2 > , v3 : ObservableInput < T3 > , scheduler ?: IScheduler ) : Observable < T | T2 | T3 > ;
35
- export function mergeStatic < T , T2 , T3 > ( v1 : ObservableInput < T > , v2 : ObservableInput < T2 > , v3 : ObservableInput < T3 > , concurrent ?: number , scheduler ?: IScheduler ) : Observable < T | T2 | T3 > ;
36
- export function mergeStatic < T , T2 , T3 , T4 > ( v1 : ObservableInput < T > , v2 : ObservableInput < T2 > , v3 : ObservableInput < T3 > , v4 : ObservableInput < T4 > , scheduler ?: IScheduler ) : Observable < T | T2 | T3 | T4 > ;
37
- export function mergeStatic < T , T2 , T3 , T4 > ( v1 : ObservableInput < T > , v2 : ObservableInput < T2 > , v3 : ObservableInput < T3 > , v4 : ObservableInput < T4 > , concurrent ?: number , scheduler ?: IScheduler ) : Observable < T | T2 | T3 | T4 > ;
38
- export function mergeStatic < T , T2 , T3 , T4 , T5 > ( v1 : ObservableInput < T > , v2 : ObservableInput < T2 > , v3 : ObservableInput < T3 > , v4 : ObservableInput < T4 > , v5 : ObservableInput < T5 > , scheduler ?: IScheduler ) : Observable < T | T2 | T3 | T4 | T5 > ;
39
- export function mergeStatic < T , T2 , T3 , T4 , T5 > ( v1 : ObservableInput < T > , v2 : ObservableInput < T2 > , v3 : ObservableInput < T3 > , v4 : ObservableInput < T4 > , v5 : ObservableInput < T5 > , concurrent ?: number , scheduler ?: IScheduler ) : Observable < T | T2 | T3 | T4 | T5 > ;
40
- export function mergeStatic < T , T2 , T3 , T4 , T5 , T6 > ( v1 : ObservableInput < T > , v2 : ObservableInput < T2 > , v3 : ObservableInput < T3 > , v4 : ObservableInput < T4 > , v5 : ObservableInput < T5 > , v6 : ObservableInput < T6 > , scheduler ?: IScheduler ) : Observable < T | T2 | T3 | T4 | T5 | T6 > ;
41
- export function mergeStatic < T , T2 , T3 , T4 , T5 , T6 > ( v1 : ObservableInput < T > , v2 : ObservableInput < T2 > , v3 : ObservableInput < T3 > , v4 : ObservableInput < T4 > , v5 : ObservableInput < T5 > , v6 : ObservableInput < T6 > , concurrent ?: number , scheduler ?: IScheduler ) : Observable < T | T2 | T3 | T4 | T5 | T6 > ;
42
- export function mergeStatic < T > ( ...observables : ( ObservableInput < T > | IScheduler | number ) [ ] ) : Observable < T > ;
43
- export function mergeStatic < T , R > ( ...observables : ( ObservableInput < any > | IScheduler | number ) [ ] ) : Observable < R > ;
44
- /* tslint:enable:max-line-length */
45
24
/**
46
25
* Creates an output Observable which concurrently emits all values from every
47
26
* given input Observable.
@@ -51,73 +30,43 @@ export function mergeStatic<T, R>(...observables: (ObservableInput<any> | ISched
51
30
*
52
31
* <img src="./img/merge.png" width="100%">
53
32
*
54
- * `merge` subscribes to each given input Observable (as arguments), and simply
55
- * forwards (without doing any transformation) all the values from all the input
56
- * Observables to the output Observable. The output Observable only completes
57
- * once all input Observables have completed. Any error delivered by an input
58
- * Observable will be immediately emitted on the output Observable.
33
+ * `merge` subscribes to each given input Observable (either the source or an
34
+ * Observable given as argument), and simply forwards (without doing any
35
+ * transformation) all the values from all the input Observables to the output
36
+ * Observable. The output Observable only completes once all input Observables
37
+ * have completed. Any error delivered by an input Observable will be immediately
38
+ * emitted on the output Observable.
59
39
*
60
40
* @example <caption>Merge together two Observables: 1s interval and clicks</caption>
61
41
* var clicks = Rx.Observable.fromEvent(document, 'click');
62
42
* var timer = Rx.Observable.interval(1000);
63
- * var clicksOrTimer = Rx.Observable. merge(clicks, timer);
43
+ * var clicksOrTimer = clicks. merge(timer);
64
44
* clicksOrTimer.subscribe(x => console.log(x));
65
45
*
66
- * // Results in the following:
67
- * // timer will emit ascending values, one every second(1000ms) to console
68
- * // clicks logs MouseEvents to console everytime the "document" is clicked
69
- * // Since the two streams are merged you see these happening
70
- * // as they occur.
71
- *
72
46
* @example <caption>Merge together 3 Observables, but only 2 run concurrently</caption>
73
47
* var timer1 = Rx.Observable.interval(1000).take(10);
74
48
* var timer2 = Rx.Observable.interval(2000).take(6);
75
49
* var timer3 = Rx.Observable.interval(500).take(10);
76
50
* var concurrent = 2; // the argument
77
- * var merged = Rx.Observable. merge(timer1, timer2, timer3, concurrent);
51
+ * var merged = timer1. merge(timer2, timer3, concurrent);
78
52
* merged.subscribe(x => console.log(x));
79
53
*
80
- * // Results in the following:
81
- * // - First timer1 and timer2 will run concurrently
82
- * // - timer1 will emit a value every 1000ms for 10 iterations
83
- * // - timer2 will emit a value every 2000ms for 6 iterations
84
- * // - after timer1 hits it's max iteration, timer2 will
85
- * // continue, and timer3 will start to run concurrently with timer2
86
- * // - when timer2 hits it's max iteration it terminates, and
87
- * // timer3 will continue to emit a value every 500ms until it is complete
88
- *
89
54
* @see {@link mergeAll }
90
55
* @see {@link mergeMap }
91
56
* @see {@link mergeMapTo }
92
57
* @see {@link mergeScan }
93
58
*
94
- * @param {...ObservableInput } observables Input Observables to merge together.
59
+ * @param {ObservableInput } other An input Observable to merge with the source
60
+ * Observable. More than one input Observables may be given as argument.
95
61
* @param {number } [concurrent=Number.POSITIVE_INFINITY] Maximum number of input
96
62
* Observables being subscribed to concurrently.
97
63
* @param {Scheduler } [scheduler=null] The IScheduler to use for managing
98
64
* concurrency of input Observables.
99
- * @return {Observable } an Observable that emits items that are the result of
65
+ * @return {Observable } An Observable that emits items that are the result of
100
66
* every input Observable.
101
- * @static true
102
- * @name merge
67
+ * @method merge
103
68
* @owner Observable
104
69
*/
105
- export function mergeStatic < T , R > ( ...observables : Array < ObservableInput < any > | IScheduler | number > ) : Observable < R > {
106
- let concurrent = Number . POSITIVE_INFINITY ;
107
- let scheduler : IScheduler = null ;
108
- let last : any = observables [ observables . length - 1 ] ;
109
- if ( isScheduler ( last ) ) {
110
- scheduler = < IScheduler > observables . pop ( ) ;
111
- if ( observables . length > 1 && typeof observables [ observables . length - 1 ] === 'number' ) {
112
- concurrent = < number > observables . pop ( ) ;
113
- }
114
- } else if ( typeof last === 'number' ) {
115
- concurrent = < number > observables . pop ( ) ;
116
- }
117
-
118
- if ( scheduler === null && observables . length === 1 && observables [ 0 ] instanceof Observable ) {
119
- return < Observable < R > > observables [ 0 ] ;
120
- }
121
-
122
- return mergeAll ( concurrent ) ( new ArrayObservable ( < any > observables , scheduler ) ) as Observable < R > ;
70
+ export function merge < T , R > ( ...observables : Array < ObservableInput < any > | IScheduler | number > ) : OperatorFunction < T , R > {
71
+ return ( source : Observable < T > ) => source . lift . call ( mergeStatic ( source , ...observables ) ) ;
123
72
}
0 commit comments