Skip to content

Commit 9709632

Browse files
chore: add sink prelude (#17595)
Generally our stream based sinks will share a lot of imports. This adds a `crate::sinks::prelude` module that can be imported to bring in the commonly used imports. The advantage is not only reducing the size of our imports and making it easier to maintain (often changes to the framework result in having to change the imports of all the sinks modules which is tedious) but it also encourages and guides using the shared components available. This also updates the following sinks to use the prelude: - amqp - kafka - aws_kinesis - loki - pulsar --------- Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com>
1 parent d0c650e commit 9709632

29 files changed

+97
-239
lines changed

docs/tutorials/sinks/1_basic_sink.md

+1-10
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,7 @@ Provide some module level comments to explain what the sink does.
2222
Let's setup all the imports we will need for the tutorial:
2323

2424
```rust
25-
use super::Healthcheck;
26-
use crate::config::{GenerateConfig, SinkConfig, SinkContext};
27-
use futures::{stream::BoxStream, StreamExt};
28-
use vector_common::finalization::{EventStatus, Finalizable};
29-
use vector_config::configurable_component;
30-
use vector_core::{
31-
config::{AcknowledgementsConfig, Input},
32-
event::Event,
33-
sink::{StreamSink, VectorSink},
34-
};
25+
use crate::prelude::*;
3526
```
3627

3728
# Configuration

docs/tutorials/sinks/2_http_sink.md

+1-22
Original file line numberDiff line numberDiff line change
@@ -12,32 +12,11 @@ To start, update our imports to the following:
1212
use std::task::Poll;
1313

1414
use crate::{
15-
config::{GenerateConfig, SinkConfig, SinkContext},
15+
sinks::prelude::*,
1616
http::HttpClient,
1717
internal_events::SinkRequestBuildError,
18-
sinks::util::{
19-
encoding::{write_all, Encoder},
20-
metadata::RequestMetadataBuilder,
21-
request_builder::EncodeResult,
22-
Compression, RequestBuilder, SinkBuilderExt,
23-
},
24-
sinks::Healthcheck,
2518
};
2619
use bytes::Bytes;
27-
use futures::{future::BoxFuture, stream::BoxStream, StreamExt};
28-
use vector_common::{
29-
finalization::{EventFinalizers, EventStatus, Finalizable},
30-
internal_event::CountByteSize,
31-
request_metadata::{MetaDescriptive, RequestMetadata},
32-
};
33-
use vector_config::configurable_component;
34-
use vector_core::{
35-
config::{AcknowledgementsConfig, Input},
36-
event::Event,
37-
sink::{StreamSink, VectorSink},
38-
stream::DriverResponse,
39-
tls::TlsSettings,
40-
};
4120
```
4221

4322
# Configuration

src/internal_events/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,7 @@ pub(crate) use self::unix::*;
259259
pub(crate) use self::websocket::*;
260260
#[cfg(windows)]
261261
pub(crate) use self::windows::*;
262-
pub(crate) use self::{
262+
pub use self::{
263263
adaptive_concurrency::*, batch::*, common::*, conditions::*, encoding_transcode::*,
264264
heartbeat::*, open::*, process::*, socket::*, tcp::*, template::*, udp::*,
265265
};

src/sinks/amqp/config.rs

+1-10
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,8 @@
11
//! Configuration functionality for the `AMQP` sink.
2-
use crate::{
3-
amqp::AmqpConfig,
4-
codecs::EncodingConfig,
5-
config::{DataType, GenerateConfig, Input, SinkConfig, SinkContext},
6-
sinks::{Healthcheck, VectorSink},
7-
template::Template,
8-
};
2+
use crate::{amqp::AmqpConfig, sinks::prelude::*};
93
use codecs::TextSerializerConfig;
10-
use futures::FutureExt;
114
use lapin::{types::ShortString, BasicProperties};
125
use std::sync::Arc;
13-
use vector_config::configurable_component;
14-
use vector_core::config::AcknowledgementsConfig;
156

167
use super::sink::AmqpSink;
178

src/sinks/amqp/encoder.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
//! Encoding for the `AMQP` sink.
2-
use crate::{
3-
event::Event,
4-
sinks::util::encoding::{write_all, Encoder},
5-
};
2+
use crate::sinks::prelude::*;
63
use bytes::BytesMut;
74
use std::io;
85
use tokio_util::codec::Encoder as _;
@@ -13,7 +10,7 @@ pub(super) struct AmqpEncoder {
1310
pub(super) transformer: crate::codecs::Transformer,
1411
}
1512

16-
impl Encoder<Event> for AmqpEncoder {
13+
impl encoding::Encoder<Event> for AmqpEncoder {
1714
fn encode_input(&self, mut input: Event, writer: &mut dyn io::Write) -> io::Result<usize> {
1815
let mut body = BytesMut::new();
1916
self.transformer.transform(&mut input);

src/sinks/amqp/request_builder.rs

+1-13
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,10 @@
11
//! Request builder for the `AMQP` sink.
22
//! Responsible for taking the event (which includes rendered template values) and turning
33
//! it into the raw bytes and other data needed to send the request to `AMQP`.
4-
use crate::{
5-
event::Event,
6-
sinks::util::{
7-
metadata::RequestMetadataBuilder, request_builder::EncodeResult, Compression,
8-
RequestBuilder,
9-
},
10-
};
4+
use crate::sinks::prelude::*;
115
use bytes::Bytes;
126
use lapin::BasicProperties;
137
use std::io;
14-
use vector_common::{
15-
finalization::{EventFinalizers, Finalizable},
16-
json_size::JsonSize,
17-
request_metadata::RequestMetadata,
18-
};
19-
use vector_core::EstimatedJsonEncodedSizeOf;
208

219
use super::{encoder::AmqpEncoder, service::AmqpRequest, sink::AmqpEvent};
2210

src/sinks/amqp/service.rs

+4-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,9 @@
11
//! The main tower service that takes the request created by the request builder
22
//! and sends it to `AMQP`.
3-
use crate::internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError};
3+
use crate::{
4+
internal_events::sink::{AmqpAcknowledgementError, AmqpDeliveryError},
5+
sinks::prelude::*,
6+
};
47
use bytes::Bytes;
58
use futures::future::BoxFuture;
69
use lapin::{options::BasicPublishOptions, BasicProperties};
@@ -9,14 +12,6 @@ use std::{
912
sync::Arc,
1013
task::{Context, Poll},
1114
};
12-
use tower::Service;
13-
use vector_common::{
14-
finalization::{EventFinalizers, EventStatus, Finalizable},
15-
internal_event::CountByteSize,
16-
json_size::JsonSize,
17-
request_metadata::{MetaDescriptive, RequestMetadata},
18-
};
19-
use vector_core::stream::DriverResponse;
2015

2116
/// The request contains the data to send to `AMQP` together
2217
/// with the information need to route the message.

src/sinks/amqp/sink.rs

+1-11
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,9 @@
11
//! The sink for the `AMQP` sink that wires together the main stream that takes the
22
//! event and sends it to `AMQP`.
3-
use crate::{
4-
codecs::Transformer, event::Event, internal_events::TemplateRenderingError,
5-
sinks::util::builder::SinkBuilderExt, template::Template,
6-
};
7-
use async_trait::async_trait;
8-
use futures::StreamExt;
9-
use futures_util::stream::BoxStream;
3+
use crate::sinks::prelude::*;
104
use lapin::{options::ConfirmSelectOptions, BasicProperties};
115
use serde::Serialize;
126
use std::sync::Arc;
13-
use tower::ServiceBuilder;
14-
use vector_buffers::EventCount;
15-
use vector_common::json_size::JsonSize;
16-
use vector_core::{sink::StreamSink, ByteSizeOf, EstimatedJsonEncodedSizeOf};
177

188
use super::{
199
config::{AmqpPropertiesConfig, AmqpSinkConfig},

src/sinks/aws_kinesis/config.rs

+6-12
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,13 @@
11
use std::marker::PhantomData;
22

3-
use tower::ServiceBuilder;
4-
use vector_config::configurable_component;
5-
use vector_core::{
6-
config::{DataType, Input},
7-
sink::VectorSink,
8-
stream::BatcherSettings,
9-
};
3+
use vector_core::stream::BatcherSettings;
104

115
use crate::{
126
aws::{AwsAuthentication, RegionOrEndpoint},
13-
codecs::{Encoder, EncodingConfig},
14-
config::AcknowledgementsConfig,
15-
sinks::util::{retries::RetryLogic, Compression, ServiceBuilderExt, TowerRequestConfig},
16-
tls::TlsConfig,
7+
sinks::{
8+
prelude::*,
9+
util::{retries::RetryLogic, TowerRequestConfig},
10+
},
1711
};
1812

1913
use super::{
@@ -78,7 +72,7 @@ impl KinesisSinkBaseConfig {
7872
}
7973

8074
/// Builds an aws_kinesis sink.
81-
pub async fn build_sink<C, R, RR, E, RT>(
75+
pub fn build_sink<C, R, RR, E, RT>(
8276
config: &KinesisSinkBaseConfig,
8377
partition_key_field: Option<String>,
8478
batch_settings: BatcherSettings,

src/sinks/aws_kinesis/firehose/config.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -141,8 +141,7 @@ impl SinkConfig for KinesisFirehoseSinkConfig {
141141
None,
142142
batch_settings,
143143
KinesisFirehoseClient { client },
144-
)
145-
.await?;
144+
)?;
146145

147146
Ok((sink, healthcheck))
148147
}

src/sinks/aws_kinesis/service.rs

+2-5
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,13 @@ use std::{
55

66
use aws_smithy_client::SdkError;
77
use aws_types::region::Region;
8-
use futures::future::BoxFuture;
9-
use tower::Service;
10-
use vector_common::{json_size::JsonSize, request_metadata::MetaDescriptive};
11-
use vector_core::{internal_event::CountByteSize, stream::DriverResponse};
8+
use vector_core::internal_event::CountByteSize;
129

1310
use super::{
1411
record::{Record, SendRecord},
1512
sink::BatchKinesisRequest,
1613
};
17-
use crate::event::EventStatus;
14+
use crate::{event::EventStatus, sinks::prelude::*};
1815

1916
pub struct KinesisService<C, T, E> {
2017
pub client: C,

src/sinks/aws_kinesis/sink.rs

+4-13
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,13 @@
11
use std::{borrow::Cow, fmt::Debug, marker::PhantomData, num::NonZeroUsize};
22

3-
use async_trait::async_trait;
4-
use futures::{future, stream::BoxStream, StreamExt};
53
use rand::random;
6-
use tower::Service;
7-
use vector_common::{
8-
finalization::{EventFinalizers, Finalizable},
9-
request_metadata::{MetaDescriptive, RequestMetadata},
10-
};
11-
use vector_core::{
12-
partition::Partitioner,
13-
stream::{BatcherSettings, DriverResponse},
14-
};
154

165
use crate::{
17-
event::{Event, LogEvent},
186
internal_events::{AwsKinesisStreamNoPartitionKeyError, SinkRequestBuildError},
19-
sinks::util::{processed_event::ProcessedEvent, SinkBuilderExt, StreamSink},
7+
sinks::{
8+
prelude::*,
9+
util::{processed_event::ProcessedEvent, StreamSink},
10+
},
2011
};
2112

2213
use super::{

src/sinks/aws_kinesis/streams/config.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -148,8 +148,7 @@ impl SinkConfig for KinesisStreamsSinkConfig {
148148
self.partition_key_field.clone(),
149149
batch_settings,
150150
KinesisStreamClient { client },
151-
)
152-
.await?;
151+
)?;
153152

154153
Ok((sink, healthcheck))
155154
}

src/sinks/kafka/config.rs

+1-6
Original file line numberDiff line numberDiff line change
@@ -5,20 +5,15 @@ use futures::FutureExt;
55
use rdkafka::ClientConfig;
66
use serde_with::serde_as;
77
use vector_config::configurable_component;
8-
use vector_core::schema::Requirement;
98
use vrl::value::Kind;
109

1110
use crate::{
12-
codecs::EncodingConfig,
13-
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
1411
kafka::{KafkaAuthConfig, KafkaCompression},
1512
serde::json::to_string,
1613
sinks::{
1714
kafka::sink::{healthcheck, KafkaSink},
18-
util::{BatchConfig, NoDefaultsBatchSettings},
19-
Healthcheck, VectorSink,
15+
prelude::*,
2016
},
21-
template::Template,
2217
};
2318

2419
pub(crate) const QUEUED_MIN_MESSAGES: u64 = 100000;

src/sinks/kafka/service.rs

+3-15
Original file line numberDiff line numberDiff line change
@@ -1,29 +1,17 @@
11
use std::task::{Context, Poll};
22

33
use bytes::Bytes;
4-
use futures::future::BoxFuture;
54
use rdkafka::{
65
error::KafkaError,
76
message::OwnedHeaders,
87
producer::{FutureProducer, FutureRecord},
98
util::Timeout,
109
};
11-
use tower::Service;
12-
use vector_common::{
13-
json_size::JsonSize,
14-
request_metadata::{MetaDescriptive, RequestMetadata},
15-
};
16-
use vector_core::{
17-
internal_event::{
18-
ByteSize, BytesSent, CountByteSize, InternalEventHandle as _, Protocol, Registered,
19-
},
20-
stream::DriverResponse,
10+
use vector_core::internal_event::{
11+
ByteSize, BytesSent, CountByteSize, InternalEventHandle as _, Protocol, Registered,
2112
};
2213

23-
use crate::{
24-
event::{EventFinalizers, EventStatus, Finalizable},
25-
kafka::KafkaStatisticsContext,
26-
};
14+
use crate::{kafka::KafkaStatisticsContext, sinks::prelude::*};
2715

2816
pub struct KafkaRequest {
2917
pub body: Bytes,

src/sinks/kafka/sink.rs

+4-11
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
1-
use async_trait::async_trait;
2-
use futures::{future, stream::BoxStream, StreamExt};
1+
use futures::future;
32
use rdkafka::{
43
consumer::{BaseConsumer, Consumer},
54
error::KafkaError,
@@ -12,17 +11,11 @@ use tower::limit::ConcurrencyLimit;
1211

1312
use super::config::{KafkaRole, KafkaSinkConfig};
1413
use crate::{
15-
codecs::{Encoder, Transformer},
16-
event::{Event, LogEvent},
1714
kafka::KafkaStatisticsContext,
18-
sinks::{
19-
kafka::{
20-
config::QUEUED_MIN_MESSAGES, request_builder::KafkaRequestBuilder,
21-
service::KafkaService,
22-
},
23-
util::{builder::SinkBuilderExt, StreamSink},
15+
sinks::kafka::{
16+
config::QUEUED_MIN_MESSAGES, request_builder::KafkaRequestBuilder, service::KafkaService,
2417
},
25-
template::{Template, TemplateParseError},
18+
sinks::prelude::*,
2619
};
2720

2821
#[derive(Debug, Snafu)]

src/sinks/kafka/tests.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,8 @@ mod integration_test {
2929
sink::KafkaSink,
3030
*,
3131
},
32-
util::{BatchConfig, NoDefaultsBatchSettings},
33-
VectorSink,
32+
prelude::*,
3433
},
35-
template::Template,
3634
test_util::{
3735
components::{assert_sink_compliance, SINK_TAGS},
3836
random_lines_with_stream, random_string, wait_for,

src/sinks/loki/config.rs

+1-10
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,12 @@
11
use std::collections::HashMap;
22

3-
use futures::future::FutureExt;
4-
use vector_config::configurable_component;
53
use vrl::value::Kind;
64

75
use super::{healthcheck::healthcheck, sink::LokiSink};
86
use crate::{
9-
codecs::EncodingConfig,
10-
config::{AcknowledgementsConfig, DataType, GenerateConfig, Input, SinkConfig, SinkContext},
117
http::{Auth, HttpClient, MaybeAuth},
128
schema,
13-
sinks::{
14-
util::{BatchConfig, Compression, SinkBatchSettings, TowerRequestConfig, UriSerde},
15-
VectorSink,
16-
},
17-
template::Template,
18-
tls::{TlsConfig, TlsSettings},
9+
sinks::{prelude::*, util::UriSerde},
1910
};
2011

2112
/// Loki-specific compression.

0 commit comments

Comments
 (0)