@@ -4,8 +4,11 @@ use crate::{
4
4
event:: { Event , LogEvent } ,
5
5
topology:: { fanout, WatchRx } ,
6
6
} ;
7
- use futures:: { channel :: mpsc as futures_mpsc , future:: try_join_all, FutureExt , SinkExt , StreamExt } ;
7
+ use futures:: { future:: try_join_all, FutureExt , Sink } ;
8
8
use itertools:: Itertools ;
9
+ use std:: collections:: VecDeque ;
10
+ use std:: pin:: Pin ;
11
+ use std:: task:: { Context , Poll } ;
9
12
use std:: {
10
13
collections:: { HashMap , HashSet } ,
11
14
iter:: FromIterator ,
@@ -56,14 +59,92 @@ impl TapPayload {
56
59
}
57
60
}
58
61
62
+ /// A `TapSink` is used as an output channel for a topology component, and receives
63
+ /// `Event`s. If these are of type `Event::LogEvent`, they are relayed to the tap client.
64
+ pub struct TapSink {
65
+ tap_tx : TapSender ,
66
+ component_name : String ,
67
+ buffer : VecDeque < LogEvent > ,
68
+ }
69
+
70
+ impl TapSink {
71
+ pub fn new ( tap_tx : TapSender , component_name : String ) -> Self {
72
+ Self {
73
+ tap_tx,
74
+ component_name,
75
+ // Pre-allocate space of 100 events, which matches the default `limit` typically
76
+ // provided to a tap subscription. If there's a higher log volume, this will block
77
+ // until the upstream event handler has processed the event. Generally, there should
78
+ // be little upstream pressure in the processing pipeline.
79
+ buffer : VecDeque :: with_capacity ( 100 ) ,
80
+ }
81
+ }
82
+ }
83
+
84
+ impl Sink < Event > for TapSink {
85
+ type Error = ( ) ;
86
+
87
+ /// The sink is ready to accept events if buffer capacity hasn't been reached.
88
+ fn poll_ready ( self : Pin < & mut Self > , _cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > > {
89
+ if self . buffer . len ( ) == self . buffer . capacity ( ) {
90
+ Poll :: Pending
91
+ } else {
92
+ Poll :: Ready ( Ok ( ( ) ) )
93
+ }
94
+ }
95
+
96
+ /// If the sink is ready, and the event is of type `LogEvent`, add to the buffer.
97
+ fn start_send ( mut self : Pin < & mut Self > , item : Event ) -> Result < ( ) , Self :: Error > {
98
+ // If we have an event, push it onto the buffer.
99
+ if let Event :: Log ( ev) = item {
100
+ self . buffer . push_back ( ev) ;
101
+ }
102
+
103
+ Ok ( ( ) )
104
+ }
105
+
106
+ /// Flushing means FIFO dequeuing. This is an O(1) operation on the `VecDeque` buffer.
107
+ fn poll_flush (
108
+ mut self : Pin < & mut Self > ,
109
+ _cx : & mut Context < ' _ > ,
110
+ ) -> Poll < Result < ( ) , Self :: Error > > {
111
+ let mut tap_tx = self . tap_tx . clone ( ) ;
112
+
113
+ // Loop over the buffer events, pulling from the front. This will terminate when
114
+ // the buffer is empty.
115
+ while let Some ( ev) = self . buffer . pop_front ( ) {
116
+ // Attempt to send upstream. If the channel is closed, log and break. If it's
117
+ // full, return pending to reattempt later.
118
+ match tap_tx. try_send ( TapPayload :: Log ( self . component_name . clone ( ) , ev) ) {
119
+ Err ( tokio_mpsc:: error:: TrySendError :: Closed ( payload) ) => {
120
+ debug ! (
121
+ message = "Couldn't send log event." ,
122
+ payload = ?payload,
123
+ component_name = ?self . component_name) ;
124
+
125
+ break ;
126
+ }
127
+ Err ( tokio_mpsc:: error:: TrySendError :: Full ( _) ) => return Poll :: Pending ,
128
+ _ => continue ,
129
+ }
130
+ }
131
+
132
+ Poll :: Ready ( Ok ( ( ) ) )
133
+ }
134
+
135
+ fn poll_close ( self : Pin < & mut Self > , cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > > {
136
+ self . poll_flush ( cx)
137
+ }
138
+ }
139
+
59
140
/// A tap sink spawns a process for listening for topology changes. If topology changes,
60
141
/// sinks are rewired to accommodate matched/unmatched patterns.
61
142
#[ derive( Debug ) ]
62
- pub struct TapSink {
143
+ pub struct TapController {
63
144
_shutdown : ShutdownTx ,
64
145
}
65
146
66
- impl TapSink {
147
+ impl TapController {
67
148
/// Creates a new tap sink, and spawns a handler for watching for topology changes
68
149
/// and a separate inner handler for events. Uses a oneshot channel to trigger shutdown
69
150
/// of handlers when the `TapSink` drops out of scope.
@@ -81,6 +162,25 @@ impl TapSink {
81
162
}
82
163
}
83
164
165
+ /// Provides a `ShutdownTx` that disconnects a component sink when it drops out of scope.
166
+ fn shutdown_trigger ( control_tx : ControlChannel , sink_id : String ) -> ShutdownTx {
167
+ let ( shutdown_tx, shutdown_rx) = oneshot:: channel ( ) ;
168
+
169
+ tokio:: spawn ( async move {
170
+ let _ = shutdown_rx. await ;
171
+ if control_tx
172
+ . send ( fanout:: ControlMessage :: Remove ( sink_id. clone ( ) ) )
173
+ . is_err ( )
174
+ {
175
+ debug ! ( message = "Couldn't disconnect sink." , sink_id = ?sink_id) ;
176
+ } else {
177
+ debug ! ( message = "Disconnected sink." , sink_id = ?sink_id) ;
178
+ }
179
+ } ) ;
180
+
181
+ shutdown_tx
182
+ }
183
+
84
184
/// Sends a 'matched' tap payload.
85
185
async fn send_matched ( mut tx : TapSender , pattern : & str ) -> Result < ( ) , SendError < TapPayload > > {
86
186
debug ! ( message = "Sending matched notification." , pattern = ?pattern) ;
@@ -93,44 +193,6 @@ async fn send_not_matched(mut tx: TapSender, pattern: &str) -> Result<(), SendEr
93
193
tx. send ( TapPayload :: not_matched ( pattern) ) . await
94
194
}
95
195
96
- /// Makes a `RouterSink` that relays `Log` as `TapPayload::Log` to a client.
97
- fn make_router (
98
- mut tx : TapSender ,
99
- control_tx : ControlChannel ,
100
- sink_id : String ,
101
- component_name : String ,
102
- ) -> ( fanout:: RouterSink , ShutdownTx ) {
103
- let ( shutdown_tx, mut shutdown_rx) = oneshot:: channel ( ) ;
104
- let ( event_tx, mut event_rx) = futures_mpsc:: unbounded ( ) ;
105
-
106
- tokio:: spawn ( async move {
107
- debug ! ( message = "Spawned event handler." , component_name = ?component_name) ;
108
-
109
- loop {
110
- tokio:: select! {
111
- _ = & mut shutdown_rx => {
112
- let _ = control_tx. send( fanout:: ControlMessage :: Remove ( sink_id. to_string( ) ) ) ;
113
- break ;
114
- } ,
115
- Some ( ev) = event_rx. next( ) => {
116
- if let Event :: Log ( ev) = ev {
117
- if let Err ( err) = tx. send( TapPayload :: Log ( component_name. clone( ) , ev) ) . await {
118
- debug!(
119
- message = "Couldn't send log event." ,
120
- error = ?err,
121
- component_name = ?component_name) ;
122
- }
123
- }
124
- }
125
- }
126
- }
127
-
128
- debug ! ( message = "Stopped event handler." , component_name = ?component_name) ;
129
- } ) ;
130
-
131
- ( Box :: new ( event_tx. sink_map_err ( |_| ( ) ) ) , shutdown_tx)
132
- }
133
-
134
196
/// Returns a tap handler that listens for topology changes, and connects sinks to observe
135
197
/// `LogEvent`s` when a component matches one or more of the provided patterns.
136
198
async fn tap_handler (
@@ -141,7 +203,8 @@ async fn tap_handler(
141
203
) {
142
204
debug ! ( message = "Started tap." , patterns = ?patterns) ;
143
205
144
- // Sinks register for the current tap. Will be updated as new components match.
206
+ // Sinks register for the current tap. Contains the name of the matched component, and
207
+ // a shutdown trigger for sending a remove control message when matching sinks change.
145
208
let mut sinks = HashMap :: new ( ) ;
146
209
147
210
loop {
@@ -174,26 +237,28 @@ async fn tap_handler(
174
237
// reconfigured with the same name as a previous, and we are not
175
238
// getting involved in config diffing at this point.
176
239
let id = Uuid :: new_v4( ) . to_string( ) ;
177
- let ( sink, shutdown_tx) = make_router(
178
- tx. clone( ) ,
179
- control_tx. clone( ) ,
180
- id. to_string( ) ,
181
- name. to_string( ) ,
182
- ) ;
240
+ let sink = TapSink :: new( tx. clone( ) , name. to_string( ) ) ;
183
241
184
- match control_tx. send( fanout:: ControlMessage :: Add ( id. to_string( ) , sink) ) {
242
+ // Attempt to connect the sink.
243
+ match control_tx
244
+ . send( fanout:: ControlMessage :: Add ( id. clone( ) , Box :: new( sink) ) )
245
+ {
185
246
Ok ( _) => {
186
- // (Over)write the sink entry.
187
- sinks. insert( name. to_string( ) , shutdown_tx) ;
188
-
189
247
debug!(
190
- message = "Component connected." ,
191
- component_name = ?name, id = ?id
248
+ message = "Sink connected." ,
249
+ sink_id = ?id, component_name = ?name,
250
+ ) ;
251
+
252
+ // Create a sink shutdown trigger to remove the sink
253
+ // when matched components change.
254
+ sinks. insert(
255
+ name. to_string( ) ,
256
+ shutdown_trigger( control_tx. clone( ) , id) ,
192
257
) ;
193
258
}
194
259
Err ( err) => {
195
260
error!(
196
- message = "Couldn't connect component ." ,
261
+ message = "Couldn't connect sink ." ,
197
262
error = ?err,
198
263
component_name = ?name, id = ?id
199
264
) ;
@@ -287,7 +352,7 @@ mod tests {
287
352
let ( _watch_tx, watch_rx) = watch:: channel ( outputs) ;
288
353
let ( sink_tx, mut sink_rx) = tokio_mpsc:: channel ( 10 ) ;
289
354
290
- let _sink = TapSink :: new (
355
+ let _controller = TapController :: new (
291
356
watch_rx,
292
357
sink_tx,
293
358
& [ pattern_matched. to_string ( ) , pattern_not_matched. to_string ( ) ] ,
0 commit comments