-
Notifications
You must be signed in to change notification settings - Fork 1.7k
/
Copy pathconfig.rs
157 lines (127 loc) · 4.63 KB
/
config.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
use http::{Request, StatusCode, Uri};
use hyper::Body;
use super::{
service::{ClickhouseRetryLogic, ClickhouseService},
sink::ClickhouseSink,
};
use crate::{
http::{get_http_scheme_from_uri, Auth, HttpClient, MaybeAuth},
sinks::{
prelude::*,
util::{RealtimeSizeBasedDefaultBatchSettings, UriSerde},
},
};
/// Configuration for the `clickhouse` sink.
#[configurable_component(sink("clickhouse", "Deliver log data to a ClickHouse database."))]
#[derive(Clone, Debug, Default)]
#[serde(deny_unknown_fields)]
pub struct ClickhouseConfig {
/// The endpoint of the ClickHouse server.
#[serde(alias = "host")]
#[configurable(metadata(docs::examples = "http://localhost:8123"))]
pub endpoint: UriSerde,
/// The table that data is inserted into.
#[configurable(metadata(docs::examples = "mytable"))]
pub table: Template,
/// The database that contains the table that data is inserted into.
#[configurable(metadata(docs::examples = "mydatabase"))]
pub database: Option<Template>,
/// Sets `input_format_skip_unknown_fields`, allowing ClickHouse to discard fields not present in the table schema.
#[serde(default)]
pub skip_unknown_fields: bool,
/// Sets `date_time_input_format` to `best_effort`, allowing ClickHouse to properly parse RFC3339/ISO 8601.
#[serde(default)]
pub date_time_best_effort: bool,
#[configurable(derived)]
#[serde(default = "Compression::gzip_default")]
pub compression: Compression,
#[configurable(derived)]
#[serde(
default,
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub encoding: Transformer,
#[configurable(derived)]
#[serde(default)]
pub batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
#[configurable(derived)]
pub auth: Option<Auth>,
#[configurable(derived)]
#[serde(default)]
pub request: TowerRequestConfig,
#[configurable(derived)]
pub tls: Option<TlsConfig>,
#[configurable(derived)]
#[serde(
default,
deserialize_with = "crate::serde::bool_or_struct",
skip_serializing_if = "crate::serde::skip_serializing_if_default"
)]
pub acknowledgements: AcknowledgementsConfig,
}
impl_generate_config_from_default!(ClickhouseConfig);
#[async_trait::async_trait]
#[typetag::serde(name = "clickhouse")]
impl SinkConfig for ClickhouseConfig {
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
let endpoint = self.endpoint.with_default_parts().uri;
let protocol = get_http_scheme_from_uri(&endpoint);
let auth = self.auth.choose_one(&self.endpoint.auth)?;
let tls_settings = TlsSettings::from_options(&self.tls)?;
let client = HttpClient::new(tls_settings, &cx.proxy)?;
let service = ClickhouseService::new(
client.clone(),
auth.clone(),
endpoint.clone(),
self.skip_unknown_fields,
self.date_time_best_effort,
);
let request_limits = self.request.unwrap_with(&Default::default());
let service = ServiceBuilder::new()
.settings(request_limits, ClickhouseRetryLogic::default())
.service(service);
let batch_settings = self.batch.into_batcher_settings()?;
let database = self.database.clone().unwrap_or_else(|| {
"default"
.try_into()
.expect("'default' should be a valid template")
});
let sink = ClickhouseSink::new(
batch_settings,
self.compression,
self.encoding.clone(),
service,
protocol,
database,
self.table.clone(),
);
let healthcheck = Box::pin(healthcheck(client, endpoint, auth));
Ok((VectorSink::from_event_streamsink(sink), healthcheck))
}
fn input(&self) -> Input {
Input::log()
}
fn acknowledgements(&self) -> &AcknowledgementsConfig {
&self.acknowledgements
}
}
async fn healthcheck(client: HttpClient, endpoint: Uri, auth: Option<Auth>) -> crate::Result<()> {
let uri = format!("{}/?query=SELECT%201", endpoint);
let mut request = Request::get(uri).body(Body::empty()).unwrap();
if let Some(auth) = auth {
auth.apply(&mut request);
}
let response = client.send(request).await?;
match response.status() {
StatusCode::OK => Ok(()),
status => Err(HealthcheckError::UnexpectedStatus { status }.into()),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn generate_config() {
crate::test_util::test_generate_config::<ClickhouseConfig>();
}
}