Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use dedicated ShutdownResult for Metric SDK shutdown #2573

Merged
merged 9 commits into from
Jan 31, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 37 additions & 4 deletions examples/metrics-basic/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use opentelemetry::{global, KeyValue};
use opentelemetry_sdk::error::ShutdownError;
use opentelemetry_sdk::metrics::{PeriodicReader, SdkMeterProvider};
use opentelemetry_sdk::Resource;
use std::error::Error;
Expand All @@ -23,7 +24,7 @@ fn init_meter_provider() -> opentelemetry_sdk::metrics::SdkMeterProvider {
}

#[tokio::main]
async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
async fn main() -> Result<(), Box<dyn Error>> {
// Initialize the MeterProvider with the stdout Exporter.
let meter_provider = init_meter_provider();

Expand Down Expand Up @@ -137,9 +138,41 @@ async fn main() -> Result<(), Box<dyn Error + Send + Sync + 'static>> {
})
.build();

// Metrics are exported by default every 30 seconds when using stdout exporter,
// however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval.
// Metrics are exported by default every 30 seconds when using stdout
// exporter, however shutting down the MeterProvider here instantly flushes
// the metrics, instead of waiting for the 30 sec interval. Shutdown returns
// a result, which is bubbled up to the caller The commented code below
// demonstrates handling the shutdown result, instead of bubbling up the
// error.
meter_provider.shutdown()?;

// let shutdown_result = meter_provider.shutdown();

// Handle the shutdown result.
// match shutdown_result {
// Ok(_) => println!("MeterProvider shutdown successfully"),
// Err(e) => {
// match e {
// opentelemetry_sdk::error::ShutdownError::InternalFailure(message) => {
// // This indicates some internal failure during shutdown. The
// // error message is intended for logging purposes only and
// // should not be used to make programmatic decisions.
// println!("MeterProvider shutdown failed: {}", message)
// }
// opentelemetry_sdk::error::ShutdownError::AlreadyShutdown => {
// // This indicates some user code tried to shutdown
// // elsewhere. user need to review their code to ensure
// // shutdown is called only once.
// println!("MeterProvider already shutdown")
// }
// opentelemetry_sdk::error::ShutdownError::Timeout(e) => {
// // This indicates the shutdown timed out, and a good hint to
// // user to increase the timeout. (Shutdown method does not
// // allow custom timeout today, but that is temporary)
// println!("MeterProvider shutdown timed out after {:?}", e)
// }
// }
// }
// }
Ok(())
}
8 changes: 6 additions & 2 deletions opentelemetry-otlp/src/exporter/http/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
use async_trait::async_trait;
use http::{header::CONTENT_TYPE, Method};
use opentelemetry::otel_debug;
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::{MetricError, MetricResult};

Expand Down Expand Up @@ -43,8 +44,11 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
let _ = self.client.lock()?.take();
fn shutdown(&self) -> ShutdownResult {
self.client
.lock()
.map_err(|e| ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
.take();

Check warning on line 51 in opentelemetry-otlp/src/exporter/http/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/http/metrics.rs#L47-L51

Added lines #L47 - L51 were not covered by tests

Ok(())
}
Expand Down
8 changes: 6 additions & 2 deletions opentelemetry-otlp/src/exporter/tonic/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
use opentelemetry_proto::tonic::collector::metrics::v1::{
metrics_service_client::MetricsServiceClient, ExportMetricsServiceRequest,
};
use opentelemetry_sdk::error::{ShutdownError, ShutdownResult};
use opentelemetry_sdk::metrics::data::ResourceMetrics;
use opentelemetry_sdk::metrics::{MetricError, MetricResult};
use tonic::{codegen::CompressionEncoding, service::Interceptor, transport::Channel, Request};
Expand Down Expand Up @@ -89,8 +90,11 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
let _ = self.inner.lock()?.take();
fn shutdown(&self) -> ShutdownResult {
self.inner
.lock()
.map_err(|e| ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e)))?
.take();

Check warning on line 97 in opentelemetry-otlp/src/exporter/tonic/metrics.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/exporter/tonic/metrics.rs#L93-L97

Added lines #L93 - L97 were not covered by tests

Ok(())
}
Expand Down
5 changes: 3 additions & 2 deletions opentelemetry-otlp/src/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

use async_trait::async_trait;
use core::fmt;
use opentelemetry_sdk::error::ShutdownResult;
use opentelemetry_sdk::metrics::MetricResult;

use opentelemetry_sdk::metrics::{
Expand Down Expand Up @@ -123,7 +124,7 @@
#[async_trait]
pub(crate) trait MetricsClient: fmt::Debug + Send + Sync + 'static {
async fn export(&self, metrics: &mut ResourceMetrics) -> MetricResult<()>;
fn shutdown(&self) -> MetricResult<()>;
fn shutdown(&self) -> ShutdownResult;
}

/// Export metrics in OTEL format.
Expand All @@ -149,7 +150,7 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {

Check warning on line 153 in opentelemetry-otlp/src/metric.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-otlp/src/metric.rs#L153

Added line #L153 was not covered by tests
self.client.shutdown()
}

Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/benches/metric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use opentelemetry::{
Key, KeyValue,
};
use opentelemetry_sdk::{
error::ShutdownResult,
metrics::{
data::ResourceMetrics, new_view, reader::MetricReader, Aggregation, Instrument,
InstrumentKind, ManualReader, MetricResult, Pipeline, SdkMeterProvider, Stream,
Expand All @@ -31,7 +32,7 @@ impl MetricReader for SharedReader {
self.0.force_flush()
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
self.0.shutdown()
}

Expand Down
38 changes: 38 additions & 0 deletions opentelemetry-sdk/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,45 @@
//! Wrapper for error from trace, logs and metrics part of open telemetry.

use std::{result::Result, time::Duration};

use thiserror::Error;

/// Trait for errors returned by exporters
pub trait ExportError: std::error::Error + Send + Sync + 'static {
/// The name of exporter that returned this error
fn exporter_name(&self) -> &'static str;
}

#[derive(Error, Debug)]
/// Errors that can occur during shutdown.
pub enum ShutdownError {
/// Shutdown has already been invoked.
///
/// While shutdown is idempotent and calling it multiple times has no
/// impact, this error suggests that another part of the application is
/// invoking `shutdown` earlier than intended. Users should review their
/// code to identify unintended or duplicate shutdown calls and ensure it is
/// only triggered once at the correct place.
#[error("Shutdown already invoked")]
AlreadyShutdown,

/// Shutdown timed out before completing.
///
/// This does not necessarily indicate a failure—shutdown may still be
/// complete. If this occurs frequently, consider increasing the timeout
/// duration to allow more time for completion.
#[error("Shutdown timed out after {0:?}")]
Timeout(Duration),

/// Shutdown failed due to an internal error.
///
/// The error message is intended for logging purposes only and should not
/// be used to make programmatic decisions. It is implementation-specific
/// and subject to change without notice. Consumers of this error should not
/// rely on its content beyond logging.
#[error("Shutdown failed: {0}")]
InternalFailure(String),
}

/// A specialized `Result` type for Shutdown operations.
pub type ShutdownResult = Result<(), ShutdownError>;
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/metrics/exporter.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
//! Interfaces for exporting metrics
use async_trait::async_trait;

use crate::error::ShutdownResult;
use crate::metrics::MetricResult;

use crate::metrics::data::ResourceMetrics;
Expand All @@ -27,7 +28,7 @@ pub trait PushMetricExporter: Send + Sync + 'static {
///
/// After Shutdown is called, calls to Export will perform no operation and
/// instead will return an error indicating the shutdown state.
fn shutdown(&self) -> MetricResult<()>;
fn shutdown(&self) -> ShutdownResult;

/// Access the [Temporality] of the MetricExporter.
fn temporality(&self) -> Temporality;
Expand Down
3 changes: 2 additions & 1 deletion opentelemetry-sdk/src/metrics/in_memory_exporter.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::error::ShutdownResult;
use crate::metrics::data::{self, Gauge, Sum};
use crate::metrics::data::{Histogram, Metric, ResourceMetrics, ScopeMetrics};
use crate::metrics::exporter::PushMetricExporter;
Expand Down Expand Up @@ -277,7 +278,7 @@ impl PushMetricExporter for InMemoryMetricExporter {
Ok(()) // In this implementation, flush does nothing
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
Ok(())
}

Expand Down
11 changes: 8 additions & 3 deletions opentelemetry-sdk/src/metrics/manual_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,10 @@

use opentelemetry::otel_debug;

use crate::metrics::{MetricError, MetricResult, Temporality};
use crate::{
error::{ShutdownError, ShutdownResult},
metrics::{MetricError, MetricResult, Temporality},
};

use super::{
data::ResourceMetrics,
Expand Down Expand Up @@ -107,8 +110,10 @@
}

/// Closes any connections and frees any resources used by the reader.
fn shutdown(&self) -> MetricResult<()> {
let mut inner = self.inner.lock()?;
fn shutdown(&self) -> ShutdownResult {
let mut inner = self.inner.lock().map_err(|e| {
ShutdownError::InternalFailure(format!("Failed to acquire lock: {}", e))
})?;

Check warning on line 116 in opentelemetry-sdk/src/metrics/manual_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/manual_reader.rs#L113-L116

Added lines #L113 - L116 were not covered by tests

// Any future call to collect will now return an error.
inner.sdk_producer = None;
Expand Down
13 changes: 7 additions & 6 deletions opentelemetry-sdk/src/metrics/meter_provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@ use opentelemetry::{
otel_debug, otel_error, otel_info, InstrumentationScope,
};

use crate::metrics::{MetricError, MetricResult};
use crate::Resource;
use crate::{
error::ShutdownResult,
metrics::{MetricError, MetricResult},
};

use super::{
meter::SdkMeter, noop::NoopMeter, pipeline::Pipelines, reader::MetricReader, view::View,
Expand Down Expand Up @@ -108,7 +111,7 @@ impl SdkMeterProvider {
///
/// There is no guaranteed that all telemetry be flushed or all resources have
/// been released on error.
pub fn shutdown(&self) -> MetricResult<()> {
pub fn shutdown(&self) -> ShutdownResult {
otel_info!(
name: "MeterProvider.Shutdown",
message = "User initiated shutdown of MeterProvider."
Expand All @@ -131,15 +134,13 @@ impl SdkMeterProviderInner {
}
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
if self
.shutdown_invoked
.swap(true, std::sync::atomic::Ordering::SeqCst)
{
// If the previous value was true, shutdown was already invoked.
Err(MetricError::Other(
"MeterProvider shutdown already invoked.".into(),
))
Err(crate::error::ShutdownError::AlreadyShutdown)
} else {
self.pipes.shutdown()
}
Expand Down
27 changes: 14 additions & 13 deletions opentelemetry-sdk/src/metrics/periodic_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
use opentelemetry::{otel_debug, otel_error, otel_info, otel_warn};

use crate::{
error::{ShutdownError, ShutdownResult},
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
Resource,
};
Expand Down Expand Up @@ -402,27 +403,27 @@
}
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
// TODO: See if this is better to be created upfront.
let (response_tx, response_rx) = mpsc::channel();
self.message_sender
.send(Message::Shutdown(response_tx))
.map_err(|e| MetricError::Other(e.to_string()))?;
.map_err(|e| ShutdownError::InternalFailure(e.to_string()))?;

// TODO: Make this timeout configurable.
match response_rx.recv_timeout(Duration::from_secs(5)) {
Ok(response) => {
if response {
Ok(())
} else {
Err(MetricError::Other("Failed to shutdown".into()))
Err(ShutdownError::InternalFailure("Failed to shutdown".into()))

Check warning on line 419 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L419

Added line #L419 was not covered by tests
}
}
Err(mpsc::RecvTimeoutError::Timeout) => Err(MetricError::Other(
"Failed to shutdown due to Timeout".into(),
)),
Err(mpsc::RecvTimeoutError::Timeout) => {
Err(ShutdownError::Timeout(Duration::from_secs(5)))

Check warning on line 423 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L423

Added line #L423 was not covered by tests
}
Err(mpsc::RecvTimeoutError::Disconnected) => {
Err(MetricError::Other("Failed to shutdown".into()))
Err(ShutdownError::InternalFailure("Failed to shutdown".into()))

Check warning on line 426 in opentelemetry-sdk/src/metrics/periodic_reader.rs

View check run for this annotation

Codecov / codecov/patch

opentelemetry-sdk/src/metrics/periodic_reader.rs#L426

Added line #L426 was not covered by tests
}
}
}
Expand Down Expand Up @@ -451,7 +452,7 @@
// completion, and avoid blocking the thread. The default shutdown on drop
// can still use blocking call. If user already explicitly called shutdown,
// drop won't call shutdown again.
fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
self.inner.shutdown()
}

Expand All @@ -471,10 +472,10 @@
mod tests {
use super::PeriodicReader;
use crate::{
metrics::InMemoryMetricExporter,
error::ShutdownResult,
metrics::{
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader, MetricError,
MetricResult, SdkMeterProvider, Temporality,
data::ResourceMetrics, exporter::PushMetricExporter, reader::MetricReader,
InMemoryMetricExporter, MetricError, MetricResult, SdkMeterProvider, Temporality,
},
Resource,
};
Expand Down Expand Up @@ -524,7 +525,7 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
Ok(())
}

Expand All @@ -548,7 +549,7 @@
Ok(())
}

fn shutdown(&self) -> MetricResult<()> {
fn shutdown(&self) -> ShutdownResult {
self.is_shutdown.store(true, Ordering::Relaxed);
Ok(())
}
Expand Down
Loading