Skip to content

Commit 3dcf59d

Browse files
feat: track runtime schema definitions for log events (#17692)
closes #16732 In order for sinks to use semantic meaning, they need a mapping of meanings to fields. This is included in the schema definition of events, but the exact definition that needs to be used depends on the path the event took to get to the sink. The schema definition of an event is tracked at runtime so this can be determined. A `parent_id` was added to event metadata to track the previous component that an event came from, which lets the topology select the correct schema definition to attach to events. For sources, there is only one definition that can be attached (for each port). This is automatically attached in the topology layer (after an event is emitted by a source), so there is no additional work in each source to support this. For transforms, it's slightly more complicated. The schema definition depends on both the output port _and_ the component the event came from. A map is generated at Vector startup, and the correct definition is obtained from that at runtime. This also happens in the topology layer so transforms don't need to worry about this. Previously the `remap` transform had custom code to support runtime schema definitions (for the VRL meaning functions). This was removed since it's now handled automatically. The `reduce` and `lua` transforms are special cases since there is no clear "path" that an event takes through the topology, since multiple events can be merged (from different inputs) in `reduce`. For `lua`, output events may not be related to input events at all. In these cases the schema definition map will have the same value for all inputs (they are all merged). The topology will then arbitrarily pick one (since they are all the same). --------- Signed-off-by: Stephen Wakely <fungus.humungus@gmail.com> Co-authored-by: Stephen Wakely <fungus.humungus@gmail.com>
1 parent 062224b commit 3dcf59d

File tree

25 files changed

+847
-428
lines changed

25 files changed

+847
-428
lines changed

lib/vector-core/Cargo.toml

+1-1
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ rand = "0.8.5"
9494
rand_distr = "0.4.3"
9595
tracing-subscriber = { version = "0.3.17", default-features = false, features = ["env-filter", "fmt", "ansi", "registry"] }
9696
vector-common = { path = "../vector-common", default-features = false, features = ["test"] }
97-
vrl = { version = "0.4.0", default-features = false, features = ["value", "arbitrary", "lua"] }
97+
vrl = { version = "0.4.0", default-features = false, features = ["value", "arbitrary", "lua", "test"] }
9898

9999
[features]
100100
api = ["dep:async-graphql"]

lib/vector-core/src/config/mod.rs

+13-15
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use std::sync::Arc;
12
use std::{collections::HashMap, fmt, num::NonZeroUsize};
23

34
use bitmask_enum::bitmask;
@@ -111,7 +112,7 @@ pub struct SourceOutput {
111112
// NOTE: schema definitions are only implemented/supported for log-type events. There is no
112113
// inherent blocker to support other types as well, but it'll require additional work to add
113114
// the relevant schemas, and store them separately in this type.
114-
pub schema_definition: Option<schema::Definition>,
115+
pub schema_definition: Option<Arc<schema::Definition>>,
115116
}
116117

117118
impl SourceOutput {
@@ -129,7 +130,7 @@ impl SourceOutput {
129130
Self {
130131
port: None,
131132
ty,
132-
schema_definition: Some(schema_definition),
133+
schema_definition: Some(Arc::new(schema_definition)),
133134
}
134135
}
135136

@@ -168,17 +169,15 @@ impl SourceOutput {
168169
/// Schema enabled is set in the users configuration.
169170
#[must_use]
170171
pub fn schema_definition(&self, schema_enabled: bool) -> Option<schema::Definition> {
172+
use std::ops::Deref;
173+
171174
self.schema_definition.as_ref().map(|definition| {
172175
if schema_enabled {
173-
definition.clone()
176+
definition.deref().clone()
174177
} else {
175178
let mut new_definition =
176179
schema::Definition::default_for_namespace(definition.log_namespaces());
177-
178-
if definition.log_namespaces().contains(&LogNamespace::Vector) {
179-
new_definition.add_meanings(definition.meanings());
180-
}
181-
180+
new_definition.add_meanings(definition.meanings());
182181
new_definition
183182
}
184183
})
@@ -203,7 +202,7 @@ pub struct TransformOutput {
203202
/// enabled, at least one definition should be output. If the transform
204203
/// has multiple connected sources, it is possible to have multiple output
205204
/// definitions - one for each input.
206-
log_schema_definitions: HashMap<OutputId, schema::Definition>,
205+
pub log_schema_definitions: HashMap<OutputId, schema::Definition>,
207206
}
208207

209208
impl TransformOutput {
@@ -245,11 +244,7 @@ impl TransformOutput {
245244
.map(|(output, definition)| {
246245
let mut new_definition =
247246
schema::Definition::default_for_namespace(definition.log_namespaces());
248-
249-
if definition.log_namespaces().contains(&LogNamespace::Vector) {
250-
new_definition.add_meanings(definition.meanings());
251-
}
252-
247+
new_definition.add_meanings(definition.meanings());
253248
(output.clone(), new_definition)
254249
})
255250
.collect()
@@ -606,7 +601,10 @@ mod test {
606601

607602
// There should be the default legacy definition without schemas enabled.
608603
assert_eq!(
609-
Some(schema::Definition::default_legacy_namespace()),
604+
Some(
605+
schema::Definition::default_legacy_namespace()
606+
.with_meaning(OwnedTargetPath::event(owned_value_path!("zork")), "zork")
607+
),
610608
output.schema_definition(false)
611609
);
612610
}

lib/vector-core/src/event/metadata.rs

+25-2
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,10 @@ use vector_common::{config::ComponentKey, EventDataEq};
77
use vrl::value::{Kind, Secrets, Value};
88

99
use super::{BatchNotifier, EventFinalizer, EventFinalizers, EventStatus};
10-
use crate::{config::LogNamespace, schema, ByteSizeOf};
10+
use crate::{
11+
config::{LogNamespace, OutputId},
12+
schema, ByteSizeOf,
13+
};
1114

1215
const DATADOG_API_KEY: &str = "datadog_api_key";
1316
const SPLUNK_HEC_TOKEN: &str = "splunk_hec_token";
@@ -30,8 +33,15 @@ pub struct EventMetadata {
3033
/// The id of the source
3134
source_id: Option<Arc<ComponentKey>>,
3235

36+
/// The id of the component this event originated from. This is used to
37+
/// determine which schema definition to attach to an event in transforms.
38+
/// This should always have a value set for events in transforms. It will always be `None`
39+
/// in a source, and there is currently no use-case for reading the value in a sink.
40+
upstream_id: Option<Arc<OutputId>>,
41+
3342
/// An identifier for a globally registered schema definition which provides information about
3443
/// the event shape (type information, and semantic meaning of fields).
44+
/// This definition is only currently valid for logs, and shouldn't be used for other event types.
3545
///
3646
/// TODO(Jean): must not skip serialization to track schemas across restarts.
3747
#[serde(default = "default_schema_definition", skip)]
@@ -71,17 +81,29 @@ impl EventMetadata {
7181
&mut self.secrets
7282
}
7383

74-
/// Returns a reference to the metadata source.
84+
/// Returns a reference to the metadata source id.
7585
#[must_use]
7686
pub fn source_id(&self) -> Option<&Arc<ComponentKey>> {
7787
self.source_id.as_ref()
7888
}
7989

90+
/// Returns a reference to the metadata parent id. This is the `OutputId`
91+
/// of the previous component the event was sent through (if any).
92+
#[must_use]
93+
pub fn upstream_id(&self) -> Option<&OutputId> {
94+
self.upstream_id.as_deref()
95+
}
96+
8097
/// Sets the `source_id` in the metadata to the provided value.
8198
pub fn set_source_id(&mut self, source_id: Arc<ComponentKey>) {
8299
self.source_id = Some(source_id);
83100
}
84101

102+
/// Sets the `upstream_id` in the metadata to the provided value.
103+
pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
104+
self.upstream_id = Some(upstream_id);
105+
}
106+
85107
/// Return the datadog API key, if it exists
86108
pub fn datadog_api_key(&self) -> Option<Arc<str>> {
87109
self.secrets.get(DATADOG_API_KEY).cloned()
@@ -111,6 +133,7 @@ impl Default for EventMetadata {
111133
finalizers: Default::default(),
112134
schema_definition: default_schema_definition(),
113135
source_id: None,
136+
upstream_id: None,
114137
}
115138
}
116139
}

lib/vector-core/src/event/mod.rs

+13-1
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use std::{
55
sync::Arc,
66
};
77

8-
use crate::ByteSizeOf;
8+
use crate::{config::OutputId, ByteSizeOf};
99
pub use array::{into_event_stream, EventArray, EventContainer, LogArray, MetricArray, TraceArray};
1010
pub use estimated_json_encoded_size_of::EstimatedJsonEncodedSizeOf;
1111
pub use finalization::{
@@ -309,12 +309,24 @@ impl Event {
309309
self.metadata_mut().set_source_id(source_id);
310310
}
311311

312+
/// Sets the `upstream_id` in the event metadata to the provided value.
313+
pub fn set_upstream_id(&mut self, upstream_id: Arc<OutputId>) {
314+
self.metadata_mut().set_upstream_id(upstream_id);
315+
}
316+
312317
/// Sets the `source_id` in the event metadata to the provided value.
313318
#[must_use]
314319
pub fn with_source_id(mut self, source_id: Arc<ComponentKey>) -> Self {
315320
self.metadata_mut().set_source_id(source_id);
316321
self
317322
}
323+
324+
/// Sets the `upstream_id` in the event metadata to the provided value.
325+
#[must_use]
326+
pub fn with_upstream_id(mut self, upstream_id: Arc<OutputId>) -> Self {
327+
self.metadata_mut().set_upstream_id(upstream_id);
328+
self
329+
}
318330
}
319331

320332
impl EventDataEq for Event {

0 commit comments

Comments
 (0)