Skip to content

Commit 9f6312d

Browse files
committed
feat(pipe): add pipe method ot Observable
Also adds type overloads for pipe and for compose NOTE: For some reason TypeScript would not let me call compose(...operations) in the pipe method
1 parent 5281229 commit 9f6312d

File tree

3 files changed

+83
-0
lines changed

3 files changed

+83
-0
lines changed

spec/Observable-spec.ts

+29
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ import * as sinon from 'sinon';
33
import * as Rx from '../dist/cjs/Rx';
44
import {TeardownLogic} from '../dist/cjs/Subscription';
55
import marbleTestingSignature = require('./helpers/marble-testing'); // tslint:disable-line:no-require-imports
6+
import { map } from '../dist/cjs/operators';
67

78
declare const { asDiagram, rxTestScheduler };
89
declare const cold: typeof marbleTestingSignature.cold;
@@ -621,6 +622,34 @@ describe('Observable', () => {
621622
});
622623
});
623624
});
625+
626+
describe('pipe', () => {
627+
it('should exist', () => {
628+
const source = Observable.of('test');
629+
expect(source.pipe).to.be.a('function');
630+
});
631+
632+
it('should pipe multiple operations', (done) => {
633+
Observable.of('test')
634+
.pipe(
635+
map((x: string) => x + x),
636+
map((x: string) => x + '!!!')
637+
)
638+
.subscribe(
639+
x => {
640+
expect(x).to.equal('testtest!!!');
641+
},
642+
null,
643+
done
644+
);
645+
});
646+
647+
it('should return the same observable if there are no arguments', () => {
648+
const source = Observable.of('test');
649+
const result = source.pipe();
650+
expect(result).to.equal(source);
651+
});
652+
});
624653
});
625654

626655
/** @test {Observable} */

src/Observable.ts

+41
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ import { toSubscriber } from './util/toSubscriber';
77
import { IfObservable } from './observable/IfObservable';
88
import { ErrorObservable } from './observable/ErrorObservable';
99
import { observable as Symbol_observable } from './symbol/observable';
10+
import { OperatorFunction } from './interfaces';
11+
import { compose } from './util/compose';
1012

1113
export interface Subscribable<T> {
1214
subscribe(observerOrNext?: PartialObserver<T> | ((value: T) => void),
@@ -286,4 +288,43 @@ export class Observable<T> implements Subscribable<T> {
286288
[Symbol_observable]() {
287289
return this;
288290
}
291+
292+
/* tslint:disable:max-line-length */
293+
pipe(): Observable<T>
294+
pipe<A>(op1: OperatorFunction<T, A>): Observable<A>
295+
pipe<A, B>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>): Observable<B>
296+
pipe<A, B, C>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>): Observable<C>
297+
pipe<A, B, C, D>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>): Observable<D>
298+
pipe<A, B, C, D, E>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>): Observable<E>
299+
pipe<A, B, C, D, E, F>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>): Observable<F>
300+
pipe<A, B, C, D, E, F, G>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>): Observable<G>
301+
pipe<A, B, C, D, E, F, G, H>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>): Observable<H>
302+
pipe<A, B, C, D, E, F, G, H, I>(op1: OperatorFunction<T, A>, op2: OperatorFunction<A, B>, op3: OperatorFunction<B, C>, op4: OperatorFunction<C, D>, op5: OperatorFunction<D, E>, op6: OperatorFunction<E, F>, op7: OperatorFunction<F, G>, op8: OperatorFunction<G, H>, op9: OperatorFunction<H, I>): Observable<I>
303+
/* tslint:enable:max-line-length */
304+
305+
/**
306+
* Used to stitch together functional operators into a chain.
307+
* @method pipe
308+
* @return {Observable} the Observable result of all of the operators having
309+
* been called in the order they were passed in.
310+
*
311+
* @example
312+
*
313+
* import { map, filter, scan } from 'rxjs/operators';
314+
*
315+
* Rx.Observable.interval(1000)
316+
* .pipe(
317+
* filter(x => x % 2 === 0),
318+
* map(x => x + x),
319+
* scan((acc, x) => acc + x)
320+
* )
321+
* .subscribe(x => console.log(x))
322+
*/
323+
pipe<R>(...operations: OperatorFunction<T, R>[]): Observable<R> {
324+
if (operations.length === 0) {
325+
return this as any;
326+
}
327+
328+
return compose.apply(this, operations)(this);
329+
}
289330
}

src/util/compose.ts

+13
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,19 @@
11
import { noop } from './noop';
22
import { UnaryFunction } from '../interfaces';
33

4+
/* tslint:disable:max-line-length */
5+
export function compose<T>(): UnaryFunction<T, T>;
6+
export function compose<T, A>(op1: UnaryFunction<T, A>): UnaryFunction<T, A>;
7+
export function compose<T, A, B>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>): UnaryFunction<T, B>;
8+
export function compose<T, A, B, C>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>): UnaryFunction<T, C>;
9+
export function compose<T, A, B, C, D>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>): UnaryFunction<T, D>;
10+
export function compose<T, A, B, C, D, E>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>, op5: UnaryFunction<D, E>): UnaryFunction<T, E>;
11+
export function compose<T, A, B, C, D, E, F>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>, op5: UnaryFunction<D, E>, op6: UnaryFunction<E, F>): UnaryFunction<T, F>;
12+
export function compose<T, A, B, C, D, E, F, G>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>, op5: UnaryFunction<D, E>, op6: UnaryFunction<E, F>, op7: UnaryFunction<F, G>): UnaryFunction<T, G>;
13+
export function compose<T, A, B, C, D, E, F, G, H>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>, op5: UnaryFunction<D, E>, op6: UnaryFunction<E, F>, op7: UnaryFunction<F, G>, op8: UnaryFunction<G, H>): UnaryFunction<T, H>;
14+
export function compose<T, A, B, C, D, E, F, G, H, I>(op1: UnaryFunction<T, A>, op2: UnaryFunction<A, B>, op3: UnaryFunction<B, C>, op4: UnaryFunction<C, D>, op5: UnaryFunction<D, E>, op6: UnaryFunction<E, F>, op7: UnaryFunction<F, G>, op8: UnaryFunction<G, H>, op9: UnaryFunction<H, I>): UnaryFunction<T, I>;
15+
/* tslint:enable:max-line-length */
16+
417
export function compose<T, R>(...fns: Array<UnaryFunction<T, R>>): UnaryFunction<T, R> {
518
if (!fns) {
619
return noop as UnaryFunction<any, any>;

0 commit comments

Comments
 (0)