@@ -2,6 +2,7 @@ import {Subject, AnonymousSubject} from '../../Subject';
2
2
import { Subscriber } from '../../Subscriber' ;
3
3
import { Observable } from '../../Observable' ;
4
4
import { Subscription } from '../../Subscription' ;
5
+ import { Operator } from '../../Operator' ;
5
6
import { root } from '../../util/root' ;
6
7
import { ReplaySubject } from '../../ReplaySubject' ;
7
8
import { Observer , NextObserver } from '../../Observer' ;
@@ -25,14 +26,16 @@ export interface WebSocketSubjectConfig {
25
26
* @hide true
26
27
*/
27
28
export class WebSocketSubject < T > extends AnonymousSubject < T > {
29
+
28
30
url : string ;
29
31
protocol : string | Array < string > ;
30
32
socket : WebSocket ;
31
33
openObserver : NextObserver < Event > ;
32
34
closeObserver : NextObserver < CloseEvent > ;
33
35
closingObserver : NextObserver < void > ;
34
36
WebSocketCtor : { new ( url : string , protocol ?: string | Array < string > ) : WebSocket } ;
35
- private _output : Subject < T > = new Subject < T > ( ) ;
37
+
38
+ private _output : Subject < T > ;
36
39
37
40
resultSelector ( e : MessageEvent ) {
38
41
return JSON . parse ( e . data ) ;
@@ -50,21 +53,29 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
50
53
}
51
54
52
55
constructor ( urlConfigOrSource : string | WebSocketSubjectConfig | Observable < T > , destination ?: Observer < T > ) {
53
- super ( ) ;
54
- this . WebSocketCtor = root . WebSocket ;
55
-
56
- if ( typeof urlConfigOrSource === 'string' ) {
57
- this . url = urlConfigOrSource ;
56
+ if ( urlConfigOrSource instanceof Observable ) {
57
+ super ( destination , < Observable < T > > urlConfigOrSource ) ;
58
58
} else {
59
- // WARNING: config object could override important members here.
60
- assign ( this , urlConfigOrSource ) ;
61
- }
62
-
63
- if ( ! this . WebSocketCtor ) {
64
- throw new Error ( 'no WebSocket constructor can be found' ) ;
59
+ super ( ) ;
60
+ this . WebSocketCtor = root . WebSocket ;
61
+ this . _output = new Subject < T > ( ) ;
62
+ if ( typeof urlConfigOrSource === 'string' ) {
63
+ this . url = urlConfigOrSource ;
64
+ } else {
65
+ // WARNING: config object could override important members here.
66
+ assign ( this , urlConfigOrSource ) ;
67
+ }
68
+ if ( ! this . WebSocketCtor ) {
69
+ throw new Error ( 'no WebSocket constructor can be found' ) ;
70
+ }
71
+ this . destination = new ReplaySubject ( ) ;
65
72
}
73
+ }
66
74
67
- this . destination = new ReplaySubject ( ) ;
75
+ lift < R > ( operator : Operator < T , R > ) : WebSocketSubject < R > {
76
+ const sock = new WebSocketSubject < R > ( this , < any > this . destination ) ;
77
+ sock . operator = operator ;
78
+ return sock ;
68
79
}
69
80
70
81
// TODO: factor this out to be a proper Operator/Subscriber implementation and eliminate closures
@@ -102,7 +113,10 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
102
113
}
103
114
104
115
private _connectSocket ( ) {
105
- const socket = this . protocol ? new WebSocket ( this . url , this . protocol ) : new WebSocket ( this . url ) ;
116
+ const { WebSocketCtor } = this ;
117
+ const socket = this . protocol ?
118
+ new WebSocketCtor ( this . url , this . protocol ) :
119
+ new WebSocketCtor ( this . url ) ;
106
120
this . socket = socket ;
107
121
const subscription = new Subscription ( ( ) => {
108
122
this . socket = null ;
@@ -178,6 +192,10 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
178
192
}
179
193
180
194
protected _subscribe ( subscriber : Subscriber < T > ) : Subscription {
195
+ const { source } = this ;
196
+ if ( source ) {
197
+ return source . subscribe ( subscriber ) ;
198
+ }
181
199
if ( ! this . socket ) {
182
200
this . _connectSocket ( ) ;
183
201
}
@@ -194,12 +212,14 @@ export class WebSocketSubject<T> extends AnonymousSubject<T> {
194
212
}
195
213
196
214
unsubscribe ( ) {
197
- const { socket } = this ;
215
+ const { source , socket } = this ;
198
216
if ( socket && socket . readyState === 1 ) {
199
217
socket . close ( ) ;
200
218
this . socket = null ;
201
219
}
202
220
super . unsubscribe ( ) ;
203
- this . destination = new ReplaySubject ( ) ;
221
+ if ( ! source ) {
222
+ this . destination = new ReplaySubject ( ) ;
223
+ }
204
224
}
205
225
}
0 commit comments