1
+ use std:: io:: Read ;
1
2
use std:: sync:: Arc ;
2
3
use std:: task;
3
4
@@ -123,10 +124,40 @@ pub struct RemoteWriteConfig {
123
124
skip_serializing_if = "crate::serde::skip_serializing_if_default"
124
125
) ]
125
126
pub acknowledgements : AcknowledgementsConfig ,
127
+
128
+ #[ configurable( derived) ]
129
+ #[ configurable( metadata( docs:: advanced) ) ]
130
+ #[ serde( default ) ]
131
+ pub compression : Compression ,
126
132
}
127
133
128
134
impl_generate_config_from_default ! ( RemoteWriteConfig ) ;
129
135
136
+ /// Supported compression types for Prometheus Remote Write.
137
+ #[ configurable_component]
138
+ #[ derive( Clone , Copy , Debug , Derivative ) ]
139
+ #[ derivative( Default ) ]
140
+ #[ serde( rename_all = "lowercase" ) ]
141
+ pub enum Compression {
142
+ /// Snappy.
143
+ #[ derivative( Default ) ]
144
+ Snappy ,
145
+
146
+ /// Gzip.
147
+ Gzip ,
148
+
149
+ /// Zstandard.
150
+ Zstd ,
151
+ }
152
+
153
+ const fn convert_compression_to_content_encoding ( compression : Compression ) -> & ' static str {
154
+ match compression {
155
+ Compression :: Snappy => "snappy" ,
156
+ Compression :: Gzip => "gzip" ,
157
+ Compression :: Zstd => "zstd" ,
158
+ }
159
+ }
160
+
130
161
#[ async_trait:: async_trait]
131
162
impl SinkConfig for RemoteWriteConfig {
132
163
async fn build (
@@ -181,6 +212,7 @@ impl SinkConfig for RemoteWriteConfig {
181
212
aws_region,
182
213
credentials_provider,
183
214
http_auth,
215
+ compression : self . compression ,
184
216
} ) ;
185
217
186
218
let healthcheck = healthcheck ( client. clone ( ) , Arc :: clone ( & http_request_builder) ) . boxed ( ) ;
@@ -190,6 +222,7 @@ impl SinkConfig for RemoteWriteConfig {
190
222
buckets,
191
223
quantiles,
192
224
http_request_builder,
225
+ compression : self . compression ,
193
226
} ;
194
227
195
228
let sink = {
@@ -277,6 +310,7 @@ struct RemoteWriteService {
277
310
buckets : Vec < f64 > ,
278
311
quantiles : Vec < f64 > ,
279
312
http_request_builder : Arc < HttpRequestBuilder > ,
313
+ compression : Compression ,
280
314
}
281
315
282
316
impl RemoteWriteService {
@@ -312,7 +346,7 @@ impl Service<PartitionInnerBuffer<Vec<Metric>, PartitionKey>> for RemoteWriteSer
312
346
fn call ( & mut self , buffer : PartitionInnerBuffer < Vec < Metric > , PartitionKey > ) -> Self :: Future {
313
347
let ( events, key) = buffer. into_parts ( ) ;
314
348
let body = self . encode_events ( events) ;
315
- let body = snap_block ( body) ;
349
+ let body = compress_block ( self . compression , body) ;
316
350
317
351
let client = self . client . clone ( ) ;
318
352
let request_builder = Arc :: clone ( & self . http_request_builder ) ;
@@ -344,6 +378,7 @@ pub struct HttpRequestBuilder {
344
378
pub aws_region : Option < Region > ,
345
379
pub http_auth : Option < Auth > ,
346
380
pub credentials_provider : Option < SharedCredentialsProvider > ,
381
+ pub compression : Compression ,
347
382
}
348
383
349
384
impl HttpRequestBuilder {
@@ -353,11 +388,13 @@ impl HttpRequestBuilder {
353
388
body : Vec < u8 > ,
354
389
tenant_id : Option < String > ,
355
390
) -> Result < Request < hyper:: Body > , crate :: Error > {
391
+ let content_encoding = convert_compression_to_content_encoding ( self . compression ) ;
392
+
356
393
let mut builder = http:: Request :: builder ( )
357
394
. method ( method)
358
395
. uri ( self . endpoint . clone ( ) )
359
396
. header ( "X-Prometheus-Remote-Write-Version" , "0.1.0" )
360
- . header ( "Content-Encoding" , "snappy" )
397
+ . header ( "Content-Encoding" , content_encoding )
361
398
. header ( "Content-Type" , "application/x-protobuf" ) ;
362
399
363
400
if let Some ( tenant_id) = & tenant_id {
@@ -380,10 +417,22 @@ impl HttpRequestBuilder {
380
417
}
381
418
}
382
419
383
- fn snap_block ( data : Bytes ) -> Vec < u8 > {
384
- snap:: raw:: Encoder :: new ( )
385
- . compress_vec ( & data)
386
- . expect ( "Out of memory" )
420
+ fn compress_block ( compression : Compression , data : Bytes ) -> Vec < u8 > {
421
+ match compression {
422
+ Compression :: Snappy => snap:: raw:: Encoder :: new ( )
423
+ . compress_vec ( & data)
424
+ . expect ( "snap compression failed, please report" ) ,
425
+ Compression :: Gzip => {
426
+ let mut buf = Vec :: new ( ) ;
427
+ flate2:: read:: GzEncoder :: new ( data. as_ref ( ) , flate2:: Compression :: default ( ) )
428
+ . read_to_end ( & mut buf)
429
+ . expect ( "gzip compression failed, please report" ) ;
430
+ buf
431
+ }
432
+ Compression :: Zstd => {
433
+ zstd:: encode_all ( data. as_ref ( ) , 0 ) . expect ( "zstd compression failed, please report" )
434
+ }
435
+ }
387
436
}
388
437
389
438
async fn sign_request (
0 commit comments