Skip to content

Commit

Permalink
chore(buffers): add event metadata to events in disk buffers
Browse files Browse the repository at this point in the history
  • Loading branch information
tobz committed Oct 11, 2023
1 parent 2b15c63 commit cc1cd1c
Show file tree
Hide file tree
Showing 21 changed files with 1,129 additions and 900 deletions.
2 changes: 1 addition & 1 deletion lib/codecs/src/decoding/format/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ impl Deserializer for NativeDeserializer {
if bytes.is_empty() {
Ok(smallvec![])
} else {
let event_array = EventArray::from(proto::EventArray::decode(bytes)?);
let event_array = EventArray::from(proto::types::EventArray::decode(bytes)?);
Ok(event_array.into_events().collect())
}
}
Expand Down
2 changes: 1 addition & 1 deletion lib/codecs/src/encoding/format/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Encoder<Event> for NativeSerializer {

fn encode(&mut self, event: Event, buffer: &mut BytesMut) -> Result<(), Self::Error> {
let array = EventArray::from(event);
let proto = proto::EventArray::from(array);
let proto = proto::types::EventArray::from(array);
proto.encode(buffer)?;
Ok(())
}
Expand Down
24 changes: 24 additions & 0 deletions lib/codecs/tests/native.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,30 @@ fn reserialize_pre_v26_native_proto_fixtures() {
);
}

/// The event proto file was changed in v0.34. This test ensures we can still load the old version
/// binary and that when serialized and deserialized in the new format we still get the same event.
#[test]
fn reserialize_pre_v34_native_json_fixtures() {
roundtrip_fixtures(
"json",
"pre-v34",
&NativeJsonDeserializerConfig::default().build(),
&mut NativeJsonSerializerConfig.build(),
true,
);
}

#[test]
fn reserialize_pre_v34_native_proto_fixtures() {
roundtrip_fixtures(
"proto",
"pre-v34",
&NativeDeserializerConfig.build(),
&mut NativeSerializerConfig.build(),
true,
);
}

// TODO: the json & protobuf consistency has been broken for a while due to the lack of implementing
// serde deser and ser of EventMetadata. Thus the `native_json` codec is not passing through the
// `EventMetadata.value` field, whereas the `native` codec does.
Expand Down
19 changes: 17 additions & 2 deletions lib/vector-core/proto/event.proto
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ message Log {
// Deprecated, use value instead
map<string, Value> fields = 1;
Value value = 2;
Value metadata = 3;
Value metadata = 3 [deprecated = true];
Metadata metadata_full = 4;
}

message Trace {
map<string, Value> fields = 1;
Value metadata = 2;
Value metadata = 2 [deprecated = true];
Metadata metadata_full = 3;
}

message ValueMap {
Expand Down Expand Up @@ -75,9 +77,22 @@ message DatadogOriginMetadata {
optional uint32 origin_service = 3;
}

message Secrets {
map<string, string> entries = 1;
}

message OutputId {
string component = 1;
optional string port = 2;
}

message Metadata {
Value value = 1;
DatadogOriginMetadata datadog_origin_metadata = 2;
optional string source_id = 3;
optional string source_type = 4;
OutputId upstream_id = 5;
Secrets secrets = 6;
}

message Metric {
Expand Down
9 changes: 9 additions & 0 deletions lib/vector-core/src/config/output_id.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ impl From<(&ComponentKey, String)> for OutputId {
}
}

impl From<(String, Option<String>)> for OutputId {
fn from((component, port): (String, Option<String>)) -> Self {
Self {
component: component.into(),
port,
}
}
}

// This panicking implementation is convenient for testing, but should never be enabled for use
// outside of tests.
#[cfg(any(test, feature = "test"))]
Expand Down
12 changes: 6 additions & 6 deletions lib/vector-core/src/event/array.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,28 +142,28 @@ pub enum EventArray {
}

impl EventArray {
/// Sets the `OutputId` in the metadata for all the events in this array.
pub fn set_output_id(&mut self, output_id: &Arc<ComponentKey>) {
/// Sets the ID of the source component in the metadata for all events in this array.
pub fn set_source_id(&mut self, source_id: &Arc<ComponentKey>) {
match self {
EventArray::Logs(logs) => {
for log in logs {
log.metadata_mut().set_source_id(Arc::clone(output_id));
log.metadata_mut().set_source_id(Arc::clone(source_id));
}
}
EventArray::Metrics(metrics) => {
for metric in metrics {
metric.metadata_mut().set_source_id(Arc::clone(output_id));
metric.metadata_mut().set_source_id(Arc::clone(source_id));
}
}
EventArray::Traces(traces) => {
for trace in traces {
trace.metadata_mut().set_source_id(Arc::clone(output_id));
trace.metadata_mut().set_source_id(Arc::clone(source_id));
}
}
}
}

/// Sets the `source_type` in the metadata for all metric events in this array.
/// Sets the source component type in the metadata for all metric events in this array.
pub fn set_source_type(&mut self, source_type: &'static str) {
if let EventArray::Metrics(metrics) = self {
for metric in metrics {
Expand Down
Loading

0 comments on commit cc1cd1c

Please sign in to comment.