mod aggregate;
mod exponential_histogram;
mod histogram;
mod last_value;
mod precomputed_sum;
mod sum;
use core::fmt;
use std::collections::{HashMap, HashSet};
use std::mem::take;
use std::ops::{Add, AddAssign, DerefMut, Sub};
use std::sync::atomic::{AtomicBool, AtomicI64, AtomicU64, AtomicUsize, Ordering};
use std::sync::{Arc, RwLock};
use aggregate::is_under_cardinality_limit;
pub(crate) use aggregate::{AggregateBuilder, ComputeAggregation, Measure};
pub(crate) use exponential_histogram::{EXPO_MAX_SCALE, EXPO_MIN_SCALE};
use once_cell::sync::Lazy;
use opentelemetry::{otel_warn, KeyValue};
use crate::metrics::AttributeSet;
pub(crate) static STREAM_OVERFLOW_ATTRIBUTES: Lazy<Vec<KeyValue>> =
Lazy::new(|| vec![KeyValue::new("otel.metric.overflow", "true")]);
pub(crate) trait Aggregator {
type InitConfig;
type PreComputedValue;
fn create(init: &Self::InitConfig) -> Self;
fn update(&self, value: Self::PreComputedValue);
fn clone_and_reset(&self, init: &Self::InitConfig) -> Self;
}
pub(crate) struct ValueMap<A>
where
A: Aggregator,
{
trackers: RwLock<HashMap<Vec<KeyValue>, Arc<A>>>,
count: AtomicUsize,
has_no_attribute_value: AtomicBool,
no_attribute_tracker: A,
config: A::InitConfig,
}
impl<A> ValueMap<A>
where
A: Aggregator,
{
fn new(config: A::InitConfig) -> Self {
ValueMap {
trackers: RwLock::new(HashMap::new()),
has_no_attribute_value: AtomicBool::new(false),
no_attribute_tracker: A::create(&config),
count: AtomicUsize::new(0),
config,
}
}
fn measure(&self, value: A::PreComputedValue, attributes: &[KeyValue]) {
if attributes.is_empty() {
self.no_attribute_tracker.update(value);
self.has_no_attribute_value.store(true, Ordering::Release);
return;
}
let Ok(trackers) = self.trackers.read() else {
return;
};
if let Some(tracker) = trackers.get(attributes) {
tracker.update(value);
return;
}
let sorted_attrs = AttributeSet::from(attributes).into_vec();
if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
tracker.update(value);
return;
}
drop(trackers);
let Ok(mut trackers) = self.trackers.write() else {
return;
};
if let Some(tracker) = trackers.get(attributes) {
tracker.update(value);
} else if let Some(tracker) = trackers.get(sorted_attrs.as_slice()) {
tracker.update(value);
} else if is_under_cardinality_limit(self.count.load(Ordering::SeqCst)) {
let new_tracker = Arc::new(A::create(&self.config));
new_tracker.update(value);
trackers.insert(attributes.to_vec(), new_tracker.clone());
trackers.insert(sorted_attrs, new_tracker);
self.count.fetch_add(1, Ordering::SeqCst);
} else if let Some(overflow_value) = trackers.get(STREAM_OVERFLOW_ATTRIBUTES.as_slice()) {
overflow_value.update(value);
} else {
let new_tracker = A::create(&self.config);
new_tracker.update(value);
trackers.insert(STREAM_OVERFLOW_ATTRIBUTES.clone(), Arc::new(new_tracker));
otel_warn!( name: "ValueMap.measure",
message = "Maximum data points for metric stream exceeded. Entry added to overflow. Subsequent overflows to same metric until next collect will not be logged."
);
}
}
pub(crate) fn collect_readonly<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, &A) -> Res,
{
prepare_data(dest, self.count.load(Ordering::SeqCst));
if self.has_no_attribute_value.load(Ordering::Acquire) {
dest.push(map_fn(vec![], &self.no_attribute_tracker));
}
let Ok(trackers) = self.trackers.read() else {
return;
};
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.iter() {
if seen.insert(Arc::as_ptr(tracker)) {
dest.push(map_fn(attrs.clone(), tracker));
}
}
}
pub(crate) fn collect_and_reset<Res, MapFn>(&self, dest: &mut Vec<Res>, mut map_fn: MapFn)
where
MapFn: FnMut(Vec<KeyValue>, A) -> Res,
{
prepare_data(dest, self.count.load(Ordering::SeqCst));
if self.has_no_attribute_value.swap(false, Ordering::AcqRel) {
dest.push(map_fn(
vec![],
self.no_attribute_tracker.clone_and_reset(&self.config),
));
}
let trackers = match self.trackers.write() {
Ok(mut trackers) => {
self.count.store(0, Ordering::SeqCst);
take(trackers.deref_mut())
}
Err(_) => todo!(),
};
let mut seen = HashSet::new();
for (attrs, tracker) in trackers.into_iter() {
if seen.insert(Arc::as_ptr(&tracker)) {
dest.push(map_fn(attrs, tracker.clone_and_reset(&self.config)));
}
}
}
}
fn prepare_data<T>(data: &mut Vec<T>, list_len: usize) {
data.clear();
let total_len = list_len + 2; if total_len > data.capacity() {
data.reserve_exact(total_len - data.capacity());
}
}
pub(crate) trait AtomicTracker<T>: Sync + Send + 'static {
fn store(&self, _value: T);
fn add(&self, _value: T);
fn get_value(&self) -> T;
fn get_and_reset_value(&self) -> T;
}
pub(crate) trait AtomicallyUpdate<T> {
type AtomicTracker: AtomicTracker<T>;
fn new_atomic_tracker(init: T) -> Self::AtomicTracker;
}
pub(crate) trait Number:
Add<Output = Self>
+ AddAssign
+ Sub<Output = Self>
+ PartialOrd
+ fmt::Debug
+ Clone
+ Copy
+ PartialEq
+ Default
+ Send
+ Sync
+ 'static
+ AtomicallyUpdate<Self>
{
fn min() -> Self;
fn max() -> Self;
fn into_float(self) -> f64;
}
impl Number for i64 {
fn min() -> Self {
i64::MIN
}
fn max() -> Self {
i64::MAX
}
fn into_float(self) -> f64 {
self as f64
}
}
impl Number for u64 {
fn min() -> Self {
u64::MIN
}
fn max() -> Self {
u64::MAX
}
fn into_float(self) -> f64 {
self as f64
}
}
impl Number for f64 {
fn min() -> Self {
f64::MIN
}
fn max() -> Self {
f64::MAX
}
fn into_float(self) -> f64 {
self
}
}
impl AtomicTracker<u64> for AtomicU64 {
fn store(&self, value: u64) {
self.store(value, Ordering::Relaxed);
}
fn add(&self, value: u64) {
self.fetch_add(value, Ordering::Relaxed);
}
fn get_value(&self) -> u64 {
self.load(Ordering::Relaxed)
}
fn get_and_reset_value(&self) -> u64 {
self.swap(0, Ordering::Relaxed)
}
}
impl AtomicallyUpdate<u64> for u64 {
type AtomicTracker = AtomicU64;
fn new_atomic_tracker(init: u64) -> Self::AtomicTracker {
AtomicU64::new(init)
}
}
impl AtomicTracker<i64> for AtomicI64 {
fn store(&self, value: i64) {
self.store(value, Ordering::Relaxed);
}
fn add(&self, value: i64) {
self.fetch_add(value, Ordering::Relaxed);
}
fn get_value(&self) -> i64 {
self.load(Ordering::Relaxed)
}
fn get_and_reset_value(&self) -> i64 {
self.swap(0, Ordering::Relaxed)
}
}
impl AtomicallyUpdate<i64> for i64 {
type AtomicTracker = AtomicI64;
fn new_atomic_tracker(init: i64) -> Self::AtomicTracker {
AtomicI64::new(init)
}
}
pub(crate) struct F64AtomicTracker {
inner: AtomicU64, }
impl F64AtomicTracker {
fn new(init: f64) -> Self {
let value_as_u64 = init.to_bits();
F64AtomicTracker {
inner: AtomicU64::new(value_as_u64),
}
}
}
impl AtomicTracker<f64> for F64AtomicTracker {
fn store(&self, value: f64) {
let value_as_u64 = value.to_bits();
self.inner.store(value_as_u64, Ordering::Relaxed);
}
fn add(&self, value: f64) {
let mut current_value_as_u64 = self.inner.load(Ordering::Relaxed);
loop {
let current_value = f64::from_bits(current_value_as_u64);
let new_value = current_value + value;
let new_value_as_u64 = new_value.to_bits();
match self.inner.compare_exchange(
current_value_as_u64,
new_value_as_u64,
Ordering::Relaxed,
Ordering::Relaxed,
) {
Ok(_) => return,
Err(v) => current_value_as_u64 = v,
}
}
}
fn get_value(&self) -> f64 {
let value_as_u64 = self.inner.load(Ordering::Relaxed);
f64::from_bits(value_as_u64)
}
fn get_and_reset_value(&self) -> f64 {
let zero_as_u64 = 0.0_f64.to_bits();
let value = self.inner.swap(zero_as_u64, Ordering::Relaxed);
f64::from_bits(value)
}
}
impl AtomicallyUpdate<f64> for f64 {
type AtomicTracker = F64AtomicTracker;
fn new_atomic_tracker(init: f64) -> Self::AtomicTracker {
F64AtomicTracker::new(init)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn can_store_u64_atomic_value() {
let atomic = u64::new_atomic_tracker(0);
let atomic_tracker = &atomic as &dyn AtomicTracker<u64>;
let value = atomic.get_value();
assert_eq!(value, 0);
atomic_tracker.store(25);
let value = atomic.get_value();
assert_eq!(value, 25);
}
#[test]
fn can_add_and_get_u64_atomic_value() {
let atomic = u64::new_atomic_tracker(0);
atomic.add(15);
atomic.add(10);
let value = atomic.get_value();
assert_eq!(value, 25);
}
#[test]
fn can_reset_u64_atomic_value() {
let atomic = u64::new_atomic_tracker(0);
atomic.add(15);
let value = atomic.get_and_reset_value();
let value2 = atomic.get_value();
assert_eq!(value, 15, "Incorrect first value");
assert_eq!(value2, 0, "Incorrect second value");
}
#[test]
fn can_store_i64_atomic_value() {
let atomic = i64::new_atomic_tracker(0);
let atomic_tracker = &atomic as &dyn AtomicTracker<i64>;
let value = atomic.get_value();
assert_eq!(value, 0);
atomic_tracker.store(-25);
let value = atomic.get_value();
assert_eq!(value, -25);
atomic_tracker.store(25);
let value = atomic.get_value();
assert_eq!(value, 25);
}
#[test]
fn can_add_and_get_i64_atomic_value() {
let atomic = i64::new_atomic_tracker(0);
atomic.add(15);
atomic.add(-10);
let value = atomic.get_value();
assert_eq!(value, 5);
}
#[test]
fn can_reset_i64_atomic_value() {
let atomic = i64::new_atomic_tracker(0);
atomic.add(15);
let value = atomic.get_and_reset_value();
let value2 = atomic.get_value();
assert_eq!(value, 15, "Incorrect first value");
assert_eq!(value2, 0, "Incorrect second value");
}
#[test]
fn can_store_f64_atomic_value() {
let atomic = f64::new_atomic_tracker(0.0);
let atomic_tracker = &atomic as &dyn AtomicTracker<f64>;
let value = atomic.get_value();
assert_eq!(value, 0.0);
atomic_tracker.store(-15.5);
let value = atomic.get_value();
assert!(f64::abs(-15.5 - value) < 0.0001);
atomic_tracker.store(25.7);
let value = atomic.get_value();
assert!(f64::abs(25.7 - value) < 0.0001);
}
#[test]
fn can_add_and_get_f64_atomic_value() {
let atomic = f64::new_atomic_tracker(0.0);
atomic.add(15.3);
atomic.add(10.4);
let value = atomic.get_value();
assert!(f64::abs(25.7 - value) < 0.0001);
}
#[test]
fn can_reset_f64_atomic_value() {
let atomic = f64::new_atomic_tracker(0.0);
atomic.add(15.5);
let value = atomic.get_and_reset_value();
let value2 = atomic.get_value();
assert!(f64::abs(15.5 - value) < 0.0001, "Incorrect first value");
assert!(f64::abs(0.0 - value2) < 0.0001, "Incorrect second value");
}
}