1
1
use super :: { ShutdownRx , ShutdownTx } ;
2
+ use crate :: topology:: fanout:: ControlChannel ;
2
3
use crate :: {
3
4
event:: { Event , LogEvent } ,
4
5
topology:: { fanout, WatchRx } ,
@@ -93,29 +94,41 @@ async fn send_not_matched(mut tx: TapSender, pattern: &str) -> Result<(), SendEr
93
94
}
94
95
95
96
/// Makes a `RouterSink` that relays `Log` as `TapPayload::Log` to a client.
96
- fn make_router ( mut tx : TapSender , component_name : & str ) -> fanout:: RouterSink {
97
+ fn make_router (
98
+ mut tx : TapSender ,
99
+ control_tx : ControlChannel ,
100
+ sink_id : String ,
101
+ component_name : String ,
102
+ ) -> ( fanout:: RouterSink , ShutdownTx ) {
103
+ let ( shutdown_tx, mut shutdown_rx) = oneshot:: channel ( ) ;
97
104
let ( event_tx, mut event_rx) = futures_mpsc:: unbounded ( ) ;
98
- let component_name = component_name. to_string ( ) ;
99
105
100
106
tokio:: spawn ( async move {
101
107
debug ! ( message = "Spawned event handler." , component_name = ?component_name) ;
102
108
103
- while let Some ( ev) = event_rx. next ( ) . await {
104
- if let Event :: Log ( ev) = ev {
105
- if let Err ( err) = tx. send ( TapPayload :: Log ( component_name. clone ( ) , ev) ) . await {
106
- debug ! (
107
- message = "Couldn't send log event." ,
108
- error = ?err,
109
- component_name = ?component_name) ;
109
+ loop {
110
+ tokio:: select! {
111
+ _ = & mut shutdown_rx => {
112
+ let _ = control_tx. send( fanout:: ControlMessage :: Remove ( sink_id. to_string( ) ) ) ;
110
113
break ;
114
+ } ,
115
+ Some ( ev) = event_rx. next( ) => {
116
+ if let Event :: Log ( ev) = ev {
117
+ if let Err ( err) = tx. send( TapPayload :: Log ( component_name. clone( ) , ev) ) . await {
118
+ debug!(
119
+ message = "Couldn't send log event." ,
120
+ error = ?err,
121
+ component_name = ?component_name) ;
122
+ }
123
+ }
111
124
}
112
125
}
113
126
}
114
127
115
128
debug ! ( message = "Stopped event handler." , component_name = ?component_name) ;
116
129
} ) ;
117
130
118
- Box :: new ( event_tx. sink_map_err ( |_| ( ) ) )
131
+ ( Box :: new ( event_tx. sink_map_err ( |_| ( ) ) ) , shutdown_tx )
119
132
}
120
133
121
134
/// Returns a tap handler that listens for topology changes, and connects sinks to observe
@@ -131,9 +144,6 @@ async fn tap_handler(
131
144
// Sinks register for the current tap. Will be updated as new components match.
132
145
let mut sinks = HashMap :: new ( ) ;
133
146
134
- // Keep a copy of the last topology snapshot, for later clean-up.
135
- let mut last_outputs = None ;
136
-
137
147
loop {
138
148
tokio:: select! {
139
149
_ = & mut shutdown_rx => break ,
@@ -164,16 +174,22 @@ async fn tap_handler(
164
174
// reconfigured with the same name as a previous, and we are not
165
175
// getting involved in config diffing at this point.
166
176
let id = Uuid :: new_v4( ) . to_string( ) ;
167
- let sink = make_router( tx. clone( ) , name) ;
177
+ let ( sink, shutdown_tx) = make_router(
178
+ tx. clone( ) ,
179
+ control_tx. clone( ) ,
180
+ id. to_string( ) ,
181
+ name. to_string( ) ,
182
+ ) ;
168
183
169
184
match control_tx. send( fanout:: ControlMessage :: Add ( id. to_string( ) , sink) ) {
170
185
Ok ( _) => {
171
186
// (Over)write the sink entry.
187
+ sinks. insert( name. to_string( ) , shutdown_tx) ;
188
+
172
189
debug!(
173
190
message = "Component connected." ,
174
191
component_name = ?name, id = ?id
175
192
) ;
176
- sinks. insert( name. to_string( ) , id) ;
177
193
}
178
194
Err ( err) => {
179
195
error!(
@@ -206,9 +222,6 @@ async fn tap_handler(
206
222
}
207
223
} ) ;
208
224
209
- // Keep the outputs for later clean-up when a shutdown is triggered.
210
- last_outputs = Some ( outputs) ;
211
-
212
225
// Send notifications to the client. The # of notifications will always be
213
226
// exactly equal to the number of patterns, so we can pre-allocate capacity.
214
227
let mut notifications = Vec :: with_capacity( patterns. len( ) ) ;
@@ -233,21 +246,6 @@ async fn tap_handler(
233
246
}
234
247
}
235
248
236
- // At this point, the tap handler is being shut down due to the client/subscription
237
- // going away. Clean up tap sinks by disconnecting them from the components being observed.
238
- if let Some ( outputs) = last_outputs {
239
- for ( name, id) in sinks {
240
- if let Some ( control_tx) = outputs. get ( & name) {
241
- if let Err ( err) = control_tx. send ( fanout:: ControlMessage :: Remove ( id) ) {
242
- error ! (
243
- message = "Couldn't disconnect tap sink." ,
244
- error = ?err,
245
- component_name = ?name) ;
246
- }
247
- }
248
- }
249
- }
250
-
251
249
debug ! ( message = "Stopped tap." , patterns = ?patterns) ;
252
250
}
253
251
0 commit comments