@@ -2,7 +2,11 @@ use std::sync::Arc;
2
2
3
3
use color_eyre:: { Section , SectionExt } ;
4
4
use eyre:: { eyre, Report , Result } ;
5
- use futures:: { future:: BoxFuture , AsyncBufReadExt , FutureExt , TryStreamExt } ;
5
+ use futures:: {
6
+ future:: { try_join_all, BoxFuture } ,
7
+ io:: AsyncBufRead ,
8
+ stream, AsyncBufReadExt , FutureExt , TryStreamExt ,
9
+ } ;
6
10
use k8s_openapi:: api:: core:: v1:: Pod ;
7
11
use kube:: { api:: LogParams , Api , ResourceExt } ;
8
12
use ratatui:: { layout:: Rect , text:: Line , widgets:: Paragraph , Frame } ;
@@ -12,7 +16,13 @@ use tokio::{
12
16
} ;
13
17
14
18
use super :: { tabs:: Tab , Widget , WIDGET_VIEWS } ;
15
- use crate :: events:: { Broadcast , Event , Keypress } ;
19
+ use crate :: {
20
+ events:: { Broadcast , Event , Keypress } ,
21
+ resources:: {
22
+ container:: { Container , ContainerExt } ,
23
+ pod:: PodExt ,
24
+ } ,
25
+ } ;
16
26
17
27
pub struct Log {
18
28
task : JoinHandle < Result < ( ) > > ,
@@ -185,11 +195,37 @@ fn log_stream<'a>(
185
195
params : LogParams ,
186
196
) -> BoxFuture < ' a , Result < ( ) > > {
187
197
async move {
188
- let mut stream = match Api :: < Pod > :: namespaced ( client. clone ( ) , & pod. namespace ( ) . unwrap ( ) )
189
- . log_stream ( & pod. name_any ( ) , & params)
190
- . await
191
- {
192
- Ok ( stream) => stream. lines ( ) ,
198
+ let client = Api :: < Pod > :: namespaced ( client. clone ( ) , & pod. namespace ( ) . unwrap ( ) ) ;
199
+
200
+ let containers = try_join_all ( pod. containers ( None ) . iter ( ) . map ( |c| {
201
+ let mut params = params. clone ( ) ;
202
+ params. container = Some ( c. name_any ( ) ) ;
203
+
204
+ container_stream ( & client, c, params)
205
+ } ) )
206
+ . await ?;
207
+
208
+ let mut all_logs = stream:: select_all ( containers. into_iter ( ) . map ( AsyncBufReadExt :: lines) ) ;
209
+
210
+ while let Some ( line) = all_logs. try_next ( ) . await ? {
211
+ tx. send ( line) ?;
212
+ }
213
+
214
+ tracing:: debug!( pod = pod. name_any( ) , "stream ended" ) ;
215
+
216
+ Ok ( ( ) )
217
+ }
218
+ . boxed ( )
219
+ }
220
+
221
+ fn container_stream < ' a > (
222
+ client : & ' a Api < Pod > ,
223
+ container : & ' a Container ,
224
+ params : LogParams ,
225
+ ) -> BoxFuture < ' a , Result < impl AsyncBufRead > > {
226
+ async move {
227
+ match client. log_stream ( & container. pod_name ( ) , & params) . await {
228
+ Ok ( stream) => Ok ( stream) ,
193
229
Err ( err) => {
194
230
let kube:: Error :: Api ( resp) = & err else {
195
231
return Err ( Report :: new ( err) ) ;
@@ -200,20 +236,12 @@ fn log_stream<'a>(
200
236
201
237
new_params. previous = false ;
202
238
203
- return log_stream ( client, pod , tx , new_params) . await ;
239
+ return container_stream ( client, container , new_params) . await ;
204
240
}
205
241
206
- return Err ( eyre ! ( err) ) ;
242
+ Err ( eyre ! ( err) )
207
243
}
208
- } ;
209
-
210
- while let Some ( line) = stream. try_next ( ) . await ? {
211
- tx. send ( line) ?;
212
244
}
213
-
214
- tracing:: debug!( pod = pod. name_any( ) , "stream ended" ) ;
215
-
216
- Ok ( ( ) )
217
245
}
218
246
. boxed ( )
219
247
}
0 commit comments