use std::mem::replace;
use std::ops::DerefMut;
use std::vec;
use std::{sync::Mutex, time::SystemTime};
use crate::metrics::data::{self, Aggregation, DataPoint};
use crate::metrics::Temporality;
use opentelemetry::KeyValue;
use super::{Aggregator, AtomicTracker, Number};
use super::{AtomicallyUpdate, ValueMap};
struct Increment<T>
where
T: AtomicallyUpdate<T>,
{
value: T::AtomicTracker,
}
impl<T> Aggregator for Increment<T>
where
T: Number,
{
type InitConfig = ();
type PreComputedValue = T;
fn create(_init: &()) -> Self {
Self {
value: T::new_atomic_tracker(T::default()),
}
}
fn update(&self, value: T) {
self.value.add(value)
}
fn clone_and_reset(&self, _: &()) -> Self {
Self {
value: T::new_atomic_tracker(self.value.get_and_reset_value()),
}
}
}
pub(crate) struct Sum<T: Number> {
value_map: ValueMap<Increment<T>>,
monotonic: bool,
start: Mutex<SystemTime>,
}
impl<T: Number> Sum<T> {
pub(crate) fn new(monotonic: bool) -> Self {
Sum {
value_map: ValueMap::new(()),
monotonic,
start: Mutex::new(SystemTime::now()),
}
}
pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
self.value_map.measure(measurement, attrs);
}
pub(crate) fn delta(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
let mut new_agg = if s_data.is_none() {
Some(data::Sum {
data_points: vec![],
temporality: Temporality::Delta,
is_monotonic: self.monotonic,
})
} else {
None
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
s_data.temporality = Temporality::Delta;
s_data.is_monotonic = self.monotonic;
let prev_start = self
.start
.lock()
.map(|mut start| replace(start.deref_mut(), t))
.unwrap_or(t);
self.value_map
.collect_and_reset(&mut s_data.data_points, |attributes, aggr| DataPoint {
attributes,
start_time: Some(prev_start),
time: Some(t),
value: aggr.value.get_value(),
exemplars: vec![],
});
(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
}
pub(crate) fn cumulative(
&self,
dest: Option<&mut dyn Aggregation>,
) -> (usize, Option<Box<dyn Aggregation>>) {
let t = SystemTime::now();
let s_data = dest.and_then(|d| d.as_mut().downcast_mut::<data::Sum<T>>());
let mut new_agg = if s_data.is_none() {
Some(data::Sum {
data_points: vec![],
temporality: Temporality::Cumulative,
is_monotonic: self.monotonic,
})
} else {
None
};
let s_data = s_data.unwrap_or_else(|| new_agg.as_mut().expect("present if s_data is none"));
s_data.temporality = Temporality::Cumulative;
s_data.is_monotonic = self.monotonic;
let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
self.value_map
.collect_readonly(&mut s_data.data_points, |attributes, aggr| DataPoint {
attributes,
start_time: Some(prev_start),
time: Some(t),
value: aggr.value.get_value(),
exemplars: vec![],
});
(
s_data.data_points.len(),
new_agg.map(|a| Box::new(a) as Box<_>),
)
}
}