@@ -25,29 +25,34 @@ class WindowCountOperator<T, R> implements Operator<T, R> {
25
25
}
26
26
27
27
class WindowCountSubscriber < T > extends Subscriber < T > {
28
- private windows : { count : number , window : Subject < T > } [ ] = [ ] ;
28
+ private windows : { count : number , notified : boolean , window : Subject < T > } [ ] = [ { count : 0 , notified : false , window : new Subject < T > ( ) } ] ;
29
29
private count : number = 0 ;
30
30
31
31
constructor ( destination : Observer < T > , private windowSize : number , private startWindowEvery : number ) {
32
- super ( destination ) ;
32
+ super ( destination ) ;
33
33
}
34
34
35
35
_next ( value : T ) {
36
36
const count = ( this . count += 1 ) ;
37
- const startWindowEvery = this . startWindowEvery ;
37
+ const startWindowEvery = ( this . startWindowEvery > 0 ) ? this . startWindowEvery : this . windowSize ;
38
38
const windowSize = this . windowSize ;
39
39
const windows = this . windows ;
40
+ const len = windows . length ;
40
41
41
- if ( startWindowEvery && count % this . startWindowEvery === 0 ) {
42
+ if ( count % startWindowEvery === 0 ) {
42
43
let window = new Subject < T > ( ) ;
43
- windows . push ( { count : 0 , window } ) ;
44
- this . destination . next ( window ) ;
44
+ windows . push ( { count : 0 , notified : false , window : window } ) ;
45
45
}
46
46
47
- const len = windows . length ;
48
47
for ( let i = 0 ; i < len ; i ++ ) {
49
48
let w = windows [ i ] ;
50
49
const window = w . window ;
50
+
51
+ if ( ! w . notified ) {
52
+ w . notified = true ;
53
+ this . destination . next ( window ) ;
54
+ }
55
+
51
56
window . next ( value ) ;
52
57
if ( windowSize === ( w . count += 1 ) ) {
53
58
window . complete ( ) ;
0 commit comments