use std::{
fmt,
sync::{Mutex, Weak},
};
use opentelemetry::otel_debug;
use crate::metrics::{MetricError, MetricResult, Temporality};
use super::{
data::ResourceMetrics,
pipeline::Pipeline,
reader::{MetricReader, SdkProducer},
};
pub struct ManualReader {
inner: Box<Mutex<ManualReaderInner>>,
temporality: Temporality,
}
impl Default for ManualReader {
fn default() -> Self {
ManualReader::builder().build()
}
}
impl fmt::Debug for ManualReader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ManualReader")
}
}
#[derive(Debug)]
struct ManualReaderInner {
sdk_producer: Option<Weak<dyn SdkProducer>>,
is_shutdown: bool,
}
impl ManualReader {
pub fn builder() -> ManualReaderBuilder {
ManualReaderBuilder::default()
}
pub(crate) fn new(temporality: Temporality) -> Self {
ManualReader {
inner: Box::new(Mutex::new(ManualReaderInner {
sdk_producer: None,
is_shutdown: false,
})),
temporality,
}
}
}
impl MetricReader for ManualReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
let _ = self.inner.lock().map(|mut inner| {
if inner.sdk_producer.is_none() {
inner.sdk_producer = Some(pipeline);
} else {
otel_debug!(
name: "ManualReader.DuplicateRegistration",
message = "The pipeline is already registered to the Reader. Registering pipeline multiple times is not allowed.");
}
});
}
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
let inner = self.inner.lock()?;
match &inner.sdk_producer.as_ref().and_then(|w| w.upgrade()) {
Some(producer) => producer.produce(rm)?,
None => {
return Err(MetricError::Other(
"reader is shut down or not registered".into(),
))
}
};
Ok(())
}
fn force_flush(&self) -> MetricResult<()> {
Ok(())
}
fn shutdown(&self) -> MetricResult<()> {
let mut inner = self.inner.lock()?;
inner.sdk_producer = None;
inner.is_shutdown = true;
Ok(())
}
fn temporality(&self, kind: super::InstrumentKind) -> Temporality {
kind.temporality_preference(self.temporality)
}
}
#[derive(Default)]
pub struct ManualReaderBuilder {
temporality: Temporality,
}
impl fmt::Debug for ManualReaderBuilder {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str("ManualReaderBuilder")
}
}
impl ManualReaderBuilder {
pub fn new() -> Self {
Default::default()
}
pub fn with_temporality(mut self, temporality: Temporality) -> Self {
self.temporality = temporality;
self
}
pub fn build(self) -> ManualReader {
ManualReader::new(self.temporality)
}
}