use std::{
env, fmt, mem,
sync::{Arc, Mutex, Weak},
time::Duration,
};
use futures_channel::{mpsc, oneshot};
use futures_util::{
future::{self, Either},
pin_mut,
stream::{self, FusedStream},
StreamExt,
};
use opentelemetry::{otel_debug, otel_error};
use crate::runtime::Runtime;
use crate::{
metrics::{exporter::PushMetricExporter, reader::SdkProducer, MetricError, MetricResult},
Resource,
};
use super::{data::ResourceMetrics, reader::MetricReader, InstrumentKind, Pipeline};
const DEFAULT_TIMEOUT: Duration = Duration::from_secs(30);
const DEFAULT_INTERVAL: Duration = Duration::from_secs(60);
const METRIC_EXPORT_INTERVAL_NAME: &str = "OTEL_METRIC_EXPORT_INTERVAL";
const METRIC_EXPORT_TIMEOUT_NAME: &str = "OTEL_METRIC_EXPORT_TIMEOUT";
#[derive(Debug)]
pub struct PeriodicReaderBuilder<E, RT> {
interval: Duration,
timeout: Duration,
exporter: E,
runtime: RT,
}
impl<E, RT> PeriodicReaderBuilder<E, RT>
where
E: PushMetricExporter,
RT: Runtime,
{
fn new(exporter: E, runtime: RT) -> Self {
let interval = env::var(METRIC_EXPORT_INTERVAL_NAME)
.ok()
.and_then(|v| v.parse().map(Duration::from_millis).ok())
.unwrap_or(DEFAULT_INTERVAL);
let timeout = env::var(METRIC_EXPORT_TIMEOUT_NAME)
.ok()
.and_then(|v| v.parse().map(Duration::from_millis).ok())
.unwrap_or(DEFAULT_TIMEOUT);
PeriodicReaderBuilder {
interval,
timeout,
exporter,
runtime,
}
}
pub fn with_interval(mut self, interval: Duration) -> Self {
if !interval.is_zero() {
self.interval = interval;
}
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
if !timeout.is_zero() {
self.timeout = timeout;
}
self
}
pub fn build(self) -> PeriodicReader {
let (message_sender, message_receiver) = mpsc::channel(256);
let worker = move |reader: &PeriodicReader| {
let runtime = self.runtime.clone();
let reader = reader.clone();
self.runtime.spawn(Box::pin(async move {
let ticker = runtime
.interval(self.interval)
.skip(1) .map(|_| Message::Export);
let messages = Box::pin(stream::select(message_receiver, ticker));
PeriodicReaderWorker {
reader,
timeout: self.timeout,
runtime,
rm: ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
},
}
.run(messages)
.await
}));
};
otel_debug!(
name: "PeriodicReader.BuildCompleted",
message = "Periodic reader built.",
interval_in_secs = self.interval.as_secs(),
);
PeriodicReader {
exporter: Arc::new(self.exporter),
inner: Arc::new(Mutex::new(PeriodicReaderInner {
message_sender,
is_shutdown: false,
sdk_producer_or_worker: ProducerOrWorker::Worker(Box::new(worker)),
})),
}
}
}
#[derive(Clone)]
pub struct PeriodicReader {
exporter: Arc<dyn PushMetricExporter>,
inner: Arc<Mutex<PeriodicReaderInner>>,
}
impl PeriodicReader {
pub fn builder<E, RT>(exporter: E, runtime: RT) -> PeriodicReaderBuilder<E, RT>
where
E: PushMetricExporter,
RT: Runtime,
{
PeriodicReaderBuilder::new(exporter, runtime)
}
}
impl fmt::Debug for PeriodicReader {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("PeriodicReader").finish()
}
}
struct PeriodicReaderInner {
message_sender: mpsc::Sender<Message>,
is_shutdown: bool,
sdk_producer_or_worker: ProducerOrWorker,
}
#[derive(Debug)]
enum Message {
Export,
Flush(oneshot::Sender<MetricResult<()>>),
Shutdown(oneshot::Sender<MetricResult<()>>),
}
enum ProducerOrWorker {
Producer(Weak<dyn SdkProducer>),
Worker(Box<dyn FnOnce(&PeriodicReader) + Send + Sync>),
}
struct PeriodicReaderWorker<RT: Runtime> {
reader: PeriodicReader,
timeout: Duration,
runtime: RT,
rm: ResourceMetrics,
}
impl<RT: Runtime> PeriodicReaderWorker<RT> {
async fn collect_and_export(&mut self) -> MetricResult<()> {
self.reader.collect(&mut self.rm)?;
if self.rm.scope_metrics.is_empty() {
return Ok(());
}
let export = self.reader.exporter.export(&mut self.rm);
let timeout = self.runtime.delay(self.timeout);
pin_mut!(export);
pin_mut!(timeout);
match future::select(export, timeout).await {
Either::Left((res, _)) => {
res }
Either::Right(_) => Err(MetricError::Other("export timed out".into())),
}
}
async fn process_message(&mut self, message: Message) -> bool {
match message {
Message::Export => {
otel_debug!(
name: "PeriodicReader.ExportTriggered",
message = "Export message received.",
);
if let Err(err) = self.collect_and_export().await {
otel_error!(
name: "PeriodicReader.ExportFailed",
message = "Failed to export metrics",
reason = format!("{}", err));
}
}
Message::Flush(ch) => {
otel_debug!(
name: "PeriodicReader.ForceFlushCalled",
message = "Flush message received.",
);
let res = self.collect_and_export().await;
if let Err(send_error) = ch.send(res) {
otel_debug!(
name: "PeriodicReader.Flush.SendResultError",
message = "Failed to send flush result.",
reason = format!("{:?}", send_error),
);
}
}
Message::Shutdown(ch) => {
otel_debug!(
name: "PeriodicReader.ShutdownCalled",
message = "Shutdown message received",
);
let res = self.collect_and_export().await;
let _ = self.reader.exporter.shutdown();
if let Err(send_error) = ch.send(res) {
otel_debug!(
name: "PeriodicReader.Shutdown.SendResultError",
message = "Failed to send shutdown result",
reason = format!("{:?}", send_error),
);
}
return false;
}
}
true
}
async fn run(mut self, mut messages: impl FusedStream<Item = Message> + Unpin) {
while let Some(message) = messages.next().await {
if !self.process_message(message).await {
break;
}
}
}
}
impl MetricReader for PeriodicReader {
fn register_pipeline(&self, pipeline: Weak<Pipeline>) {
let mut inner = match self.inner.lock() {
Ok(guard) => guard,
Err(_) => return,
};
let worker = match &mut inner.sdk_producer_or_worker {
ProducerOrWorker::Producer(_) => {
otel_debug!(name: "PeriodicReader.DuplicateRegistration",
message = "duplicate registration found, did not register periodic reader.");
return;
}
ProducerOrWorker::Worker(w) => mem::replace(w, Box::new(|_| {})),
};
inner.sdk_producer_or_worker = ProducerOrWorker::Producer(pipeline);
worker(self);
}
fn collect(&self, rm: &mut ResourceMetrics) -> MetricResult<()> {
let inner = self.inner.lock()?;
if inner.is_shutdown {
return Err(MetricError::Other("reader is shut down".into()));
}
if let Some(producer) = match &inner.sdk_producer_or_worker {
ProducerOrWorker::Producer(sdk_producer) => sdk_producer.upgrade(),
ProducerOrWorker::Worker(_) => None,
} {
producer.produce(rm)?;
} else {
return Err(MetricError::Other("reader is not registered".into()));
}
Ok(())
}
fn force_flush(&self) -> MetricResult<()> {
let mut inner = self.inner.lock()?;
if inner.is_shutdown {
return Err(MetricError::Other("reader is shut down".into()));
}
let (sender, receiver) = oneshot::channel();
inner
.message_sender
.try_send(Message::Flush(sender))
.map_err(|e| MetricError::Other(e.to_string()))?;
drop(inner); futures_executor::block_on(receiver)
.map_err(|err| MetricError::Other(err.to_string()))
.and_then(|res| res)
}
fn shutdown(&self) -> MetricResult<()> {
let mut inner = self.inner.lock()?;
if inner.is_shutdown {
return Err(MetricError::Other("reader is already shut down".into()));
}
let (sender, receiver) = oneshot::channel();
inner
.message_sender
.try_send(Message::Shutdown(sender))
.map_err(|e| MetricError::Other(e.to_string()))?;
drop(inner); let shutdown_result = futures_executor::block_on(receiver)
.map_err(|err| MetricError::Other(err.to_string()))?;
let mut inner = self.inner.lock()?;
inner.is_shutdown = true;
shutdown_result
}
fn temporality(&self, kind: InstrumentKind) -> super::Temporality {
kind.temporality_preference(self.exporter.temporality())
}
}
#[cfg(all(test, feature = "testing"))]
mod tests {
use super::PeriodicReader;
use crate::metrics::reader::MetricReader;
use crate::metrics::MetricError;
use crate::{
metrics::data::ResourceMetrics, metrics::SdkMeterProvider, runtime,
testing::metrics::InMemoryMetricExporter, Resource,
};
use opentelemetry::metrics::MeterProvider;
use std::sync::mpsc;
#[test]
fn collection_triggered_by_interval_tokio_current() {
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio() {
collection_triggered_by_interval_helper(runtime::Tokio);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio() {
collection_triggered_by_interval_helper(runtime::Tokio);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 1)]
async fn collection_triggered_by_interval_from_tokio_multi_one_thread_on_runtime_tokio_current()
{
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn collection_triggered_by_interval_from_tokio_multi_two_thread_on_runtime_tokio_current()
{
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}
#[tokio::test(flavor = "current_thread")]
#[ignore = "See issue https://github.com/open-telemetry/opentelemetry-rust/issues/2056"]
async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio() {
collection_triggered_by_interval_helper(runtime::Tokio);
}
#[tokio::test(flavor = "current_thread")]
async fn collection_triggered_by_interval_from_tokio_current_on_runtime_tokio_current() {
collection_triggered_by_interval_helper(runtime::TokioCurrentThread);
}
#[test]
fn unregistered_collect() {
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime::Tokio).build();
let mut rm = ResourceMetrics {
resource: Resource::empty(),
scope_metrics: Vec::new(),
};
let result = reader.collect(&mut rm);
assert!(
matches!(result.unwrap_err(), MetricError::Other(err) if err == "reader is not registered")
);
}
fn collection_triggered_by_interval_helper<RT>(runtime: RT)
where
RT: crate::runtime::Runtime,
{
let interval = std::time::Duration::from_millis(1);
let exporter = InMemoryMetricExporter::default();
let reader = PeriodicReader::builder(exporter.clone(), runtime)
.with_interval(interval)
.build();
let (sender, receiver) = mpsc::channel();
let meter_provider = SdkMeterProvider::builder().with_reader(reader).build();
let meter = meter_provider.meter("test");
let _counter = meter
.u64_observable_counter("testcounter")
.with_callback(move |_| {
sender.send(()).expect("channel should still be open");
})
.build();
receiver
.recv()
.expect("message should be available in channel, indicating a collection occurred");
}
}