Skip to content

Commit

Permalink
feat: add unixtime formats
Browse files Browse the repository at this point in the history
* `unix_ms`: milliseconds
* `unix_us`: microseconds
* `unix_ns`: nanoseconds
* `unix_float`: seconds float

vectordotdev#17323
  • Loading branch information
Scott Strickland committed Oct 9, 2023
1 parent f0adce7 commit c273d86
Show file tree
Hide file tree
Showing 40 changed files with 144 additions and 50 deletions.
155 changes: 105 additions & 50 deletions src/codecs/encoding/transformer.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
#![deny(missing_docs)]

use chrono::{DateTime, Utc};
use core::fmt::Debug;
use std::collections::BTreeMap;

Expand Down Expand Up @@ -176,32 +177,49 @@ impl Transformer {
}
}

fn format_timestamp<F, T>(&self, log: &mut LogEvent, method: F)
where
F: Fn(&DateTime<Utc>) -> T,
Value: From<T>
{
if log.value().is_object() {
let mut unix_timestamps = Vec::new();
for (k, v) in log.all_event_fields().expect("must be an object") {
if let Value::Timestamp(ts) = v {
unix_timestamps.push((k.clone(), Value::from(method(&ts))));
}
}
for (k, v) in unix_timestamps {
log.parse_path_and_insert(k, v).unwrap();
}
} else {
// root is not an object
let timestamp = if let Value::Timestamp(ts) = log.value() {
Some(method(&ts))
} else {
None
};
if let Some(ts) = timestamp {
log.insert(event_path!(), Value::from(ts));
}
}
}

fn apply_timestamp_format(&self, log: &mut LogEvent) {
if let Some(timestamp_format) = self.timestamp_format.as_ref() {
let seconds = |ts: &DateTime<Utc>| ts.timestamp();
let millis = |ts: &DateTime<Utc>| ts.timestamp_millis();
let micros = |ts: &DateTime<Utc>| ts.timestamp_micros();
let nanos = |ts: &DateTime<Utc>|
ts.timestamp_nanos_opt().expect("Timestamp out of range");
let float = |ts: &DateTime<Utc>| ts.timestamp_micros() as f64 / 1e6;

match timestamp_format {
TimestampFormat::Unix => {
if log.value().is_object() {
let mut unix_timestamps = Vec::new();
for (k, v) in log.all_event_fields().expect("must be an object") {
if let Value::Timestamp(ts) = v {
unix_timestamps.push((k.clone(), Value::Integer(ts.timestamp())));
}
}
for (k, v) in unix_timestamps {
log.parse_path_and_insert(k, v).unwrap();
}
} else {
// root is not an object
let timestamp = if let Value::Timestamp(ts) = log.value() {
Some(ts.timestamp())
} else {
None
};
if let Some(ts) = timestamp {
log.insert(event_path!(), Value::Integer(ts));
}
}
}
TimestampFormat::Unix => self.format_timestamp(log, seconds),
TimestampFormat::UnixMs => self.format_timestamp(log, millis),
TimestampFormat::UnixUs => self.format_timestamp(log, micros),
TimestampFormat::UnixNs => self.format_timestamp(log, nanos),
TimestampFormat::UnixFloat => self.format_timestamp(log, float),
// RFC3339 is the default serialization of a timestamp.
TimestampFormat::Rfc3339 => (),
}
Expand All @@ -225,14 +243,26 @@ impl Transformer {

#[configurable_component]
#[derive(Clone, Copy, Debug, Eq, PartialEq)]
#[serde(rename_all = "lowercase")]
#[serde(rename_all = "snake_case")]
/// The format in which a timestamp should be represented.
pub enum TimestampFormat {
/// Represent the timestamp as a Unix timestamp.
Unix,

/// Represent the timestamp as a RFC 3339 timestamp.
Rfc3339,

/// Represent the timestamp as a Unix timestamp in milliseconds.
UnixMs,

/// Represent the timestamp as a Unix timestamp in microseconds
UnixUs,

/// Represent the timestamp as a Unix timestamp in nanoseconds.
UnixNs,

/// Represent the timestamp as a Unix timestamp in floating point.
UnixFloat,
}

#[cfg(test)]
Expand Down Expand Up @@ -344,9 +374,8 @@ mod tests {

#[test]
fn deserialize_and_transform_timestamp() {
let transformer: Transformer = toml::from_str(r#"timestamp_format = "unix""#).unwrap();
let mut event = Event::Log(LogEvent::from("Demo"));
let timestamp = event
let mut base = Event::Log(LogEvent::from("Demo"));
let timestamp = base
.as_mut_log()
.get((
lookup::PathPrefix::Event,
Expand All @@ -355,32 +384,58 @@ mod tests {
.unwrap()
.clone();
let timestamp = timestamp.as_timestamp().unwrap();
event
base
.as_mut_log()
.insert("another", Value::Timestamp(*timestamp));

transformer.transform(&mut event);

match event
.as_mut_log()
.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
))
.unwrap()
{
Value::Integer(_) => {}
e => panic!(
"Timestamp was not transformed into a Unix timestamp. Was {:?}",
e
),
}
match event.as_mut_log().get("another").unwrap() {
Value::Integer(_) => {}
e => panic!(
"Timestamp was not transformed into a Unix timestamp. Was {:?}",
e
),
let cases = [
("unix", Some(timestamp.timestamp()), None),
("unix_ms", Some(timestamp.timestamp_millis()), None),
("unix_us", Some(timestamp.timestamp_micros()), None),
("unix_ns", Some(timestamp.timestamp_nanos_opt().expect("Timestamp out of range")), None),
("unix_float", None, Some(timestamp.timestamp_micros() as f64 / 1e6)),
];
for (fmt, expected_int, expected_float) in cases {
let config: String = format!(r#"timestamp_format = "{}""#, fmt);
let transformer: Transformer = toml::from_str(&config).unwrap();
let mut event = base.clone();
transformer.transform(&mut event);

match event
.as_mut_log()
.get((
lookup::PathPrefix::Event,
log_schema().timestamp_key().unwrap(),
))
.unwrap()
{
Value::Integer(actual) => {
assert!(expected_int.is_some());
assert_eq!(&expected_int.unwrap(), actual);
}
Value::Float(actual) => {
assert!(expected_float.is_some());
assert_eq!(expected_float.unwrap(), actual.into_inner());
}
e => panic!(
"Timestamp was not transformed into a Unix timestamp. Was {:?}",
e
),
}
match event.as_mut_log().get("another").unwrap() {
Value::Integer(actual) => {
assert!(expected_int.is_some());
assert_eq!(&expected_int.unwrap(), actual);
}
Value::Float(actual) => {
assert!(expected_float.is_some());
assert_eq!(expected_float.unwrap(), actual.into_inner());
}
e => panic!(
"Timestamp was not transformed into a Unix timestamp. Was {:?}",
e
),
}
}
}

Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks.cue
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@ components: sinks: [Name=string]: {
enum: {
rfc3339: "Formats as a RFC3339 string"
unix: "Formats as a unix timestamp"
unix_ms: "Formats as a unix timestamp in milliseconds"
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/amqp.cue
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,7 @@ base: components: sinks: amqp: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/appsignal.cue
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ base: components: sinks: appsignal: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,7 @@ base: components: sinks: aws_cloudwatch_logs: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ base: components: sinks: aws_kinesis_firehose: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -418,6 +418,7 @@ base: components: sinks: aws_kinesis_streams: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/aws_s3.cue
Original file line number Diff line number Diff line change
Expand Up @@ -527,6 +527,7 @@ base: components: sinks: aws_s3: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/aws_sns.cue
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ base: components: sinks: aws_sns: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/aws_sqs.cue
Original file line number Diff line number Diff line change
Expand Up @@ -354,6 +354,7 @@ base: components: sinks: aws_sqs: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/azure_blob.cue
Original file line number Diff line number Diff line change
Expand Up @@ -381,6 +381,7 @@ base: components: sinks: azure_blob: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ base: components: sinks: azure_monitor_logs: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/clickhouse.cue
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ base: components: sinks: clickhouse: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/console.cue
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ base: components: sinks: console: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/databend.cue
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ base: components: sinks: databend: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ base: components: sinks: datadog_logs: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -402,6 +402,7 @@ base: components: sinks: elasticsearch: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/file.cue
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,7 @@ base: components: sinks: file: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,7 @@ base: components: sinks: gcp_chronicle_unstructured: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,7 @@ base: components: sinks: gcp_cloud_storage: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/gcp_pubsub.cue
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,7 @@ base: components: sinks: gcp_pubsub: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ base: components: sinks: gcp_stackdriver_logs: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/honeycomb.cue
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ base: components: sinks: honeycomb: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/http.cue
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,7 @@ base: components: sinks: http: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
1 change: 1 addition & 0 deletions website/cue/reference/components/sinks/base/humio_logs.cue
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,7 @@ base: components: sinks: humio_logs: configuration: {
type: string: enum: {
rfc3339: "Represent the timestamp as a RFC 3339 timestamp."
unix: "Represent the timestamp as a Unix timestamp."
unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds."
}
}
}
Expand Down
Loading

0 comments on commit c273d86

Please sign in to comment.