1
1
import Operator from '../Operator' ;
2
- import Observer from '../Observer' ;
3
2
import Scheduler from '../Scheduler' ;
4
3
import Subscriber from '../Subscriber' ;
5
4
import Notification from '../Notification' ;
6
5
import immediate from '../schedulers/immediate' ;
6
+ import isDate from '../util/isDate' ;
7
7
8
- export default function delay < T > ( delay : number , scheduler : Scheduler = immediate ) {
9
- return this . lift ( new DelayOperator ( delay , scheduler ) ) ;
8
+ export default function delay < T > ( delay : number | Date ,
9
+ scheduler : Scheduler = immediate ) {
10
+ let absoluteDelay = isDate ( delay ) ;
11
+ let delayFor = absoluteDelay ? ( + delay - scheduler . now ( ) ) : < number > delay ;
12
+ return this . lift ( new DelayOperator ( delayFor , scheduler ) ) ;
10
13
}
11
14
12
15
class DelayOperator < T , R > implements Operator < T , R > {
13
-
14
- delay : number ;
15
- scheduler : Scheduler ;
16
-
17
- constructor ( delay : number , scheduler : Scheduler ) {
18
- this . delay = delay ;
19
- this . scheduler = scheduler ;
16
+ constructor ( private delay : number ,
17
+ private scheduler : Scheduler ) {
20
18
}
21
19
22
20
call ( subscriber : Subscriber < T > ) : Subscriber < T > {
@@ -25,21 +23,20 @@ class DelayOperator<T, R> implements Operator<T, R> {
25
23
}
26
24
27
25
class DelaySubscriber < T > extends Subscriber < T > {
26
+ private queue : Array < any > = [ ] ;
27
+ private active : boolean = false ;
28
+ private errored : boolean = false ;
28
29
29
- protected delay : number ;
30
- protected queue : Array < any > = [ ] ;
31
- protected scheduler : Scheduler ;
32
- protected active : boolean = false ;
33
- protected errored : boolean = false ;
34
-
35
- static dispatch ( state ) {
30
+ private static dispatch ( state ) : void {
36
31
const source = state . source ;
37
32
const queue = source . queue ;
38
33
const scheduler = state . scheduler ;
39
34
const destination = state . destination ;
35
+
40
36
while ( queue . length > 0 && ( queue [ 0 ] . time - scheduler . now ( ) ) <= 0 ) {
41
37
queue . shift ( ) . notification . observe ( destination ) ;
42
38
}
39
+
43
40
if ( queue . length > 0 ) {
44
41
let delay = Math . max ( 0 , queue [ 0 ] . time - scheduler . now ( ) ) ;
45
42
( < any > this ) . schedule ( state , delay ) ;
@@ -48,56 +45,50 @@ class DelaySubscriber<T> extends Subscriber<T> {
48
45
}
49
46
}
50
47
51
- constructor ( destination : Subscriber < T > , delay : number , scheduler : Scheduler ) {
48
+ constructor ( destination : Subscriber < T > ,
49
+ private delay : number ,
50
+ private scheduler : Scheduler ) {
52
51
super ( destination ) ;
53
- this . delay = delay ;
54
- this . scheduler = scheduler ;
55
52
}
56
53
57
- _next ( x ) {
58
- if ( this . errored ) {
54
+ private _schedule ( scheduler : Scheduler ) : void {
55
+ this . active = true ;
56
+ this . add ( scheduler . schedule ( DelaySubscriber . dispatch , this . delay , {
57
+ source : this , destination : this . destination , scheduler : scheduler
58
+ } ) ) ;
59
+ }
60
+
61
+ private scheduleNotification ( notification : Notification < any > ) : void {
62
+ if ( this . errored === true ) {
59
63
return ;
60
64
}
65
+
61
66
const scheduler = this . scheduler ;
62
- this . queue . push ( new DelayMessage < T > ( scheduler . now ( ) + this . delay , Notification . createNext ( x ) ) ) ;
67
+ let message = new DelayMessage < T > ( scheduler . now ( ) + this . delay , notification ) ;
68
+ this . queue . push ( message ) ;
69
+
63
70
if ( this . active === false ) {
64
71
this . _schedule ( scheduler ) ;
65
72
}
66
73
}
67
74
68
- _error ( e ) {
69
- const scheduler = this . scheduler ;
70
- this . errored = true ;
71
- this . queue = [ new DelayMessage < T > ( scheduler . now ( ) + this . delay , Notification . createError ( e ) ) ] ;
72
- if ( this . active === false ) {
73
- this . _schedule ( scheduler ) ;
74
- }
75
+ _next ( value : T ) {
76
+ this . scheduleNotification ( Notification . createNext ( value ) ) ;
75
77
}
76
78
77
- _complete ( ) {
78
- if ( this . errored ) {
79
- return ;
80
- }
81
- const scheduler = this . scheduler ;
82
- this . queue . push ( new DelayMessage < T > ( scheduler . now ( ) + this . delay , Notification . createComplete ( ) ) ) ;
83
- if ( this . active === false ) {
84
- this . _schedule ( scheduler ) ;
85
- }
79
+ _error ( err ) {
80
+ this . errored = true ;
81
+ this . queue = [ ] ;
82
+ this . destination . error ( err ) ;
86
83
}
87
84
88
- _schedule ( scheduler ) {
89
- this . active = true ;
90
- this . add ( scheduler . schedule ( DelaySubscriber . dispatch , this . delay , {
91
- source : this , destination : this . destination , scheduler : scheduler
92
- } ) ) ;
85
+ _complete ( ) {
86
+ this . scheduleNotification ( Notification . createComplete ( ) ) ;
93
87
}
94
88
}
95
89
96
90
class DelayMessage < T > {
97
- time : number ;
98
- notification : any ;
99
- constructor ( time : number , notification : any ) {
100
- this . time = time ;
101
- this . notification = notification ;
91
+ constructor ( private time : number ,
92
+ private notification : any ) {
102
93
}
103
94
}
0 commit comments