1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
use std::{mem::replace, ops::DerefMut, sync::Mutex, time::SystemTime};

use crate::metrics::data::DataPoint;
use opentelemetry::KeyValue;

use super::{Aggregator, AtomicTracker, AtomicallyUpdate, Number, ValueMap};

/// this is reused by PrecomputedSum
pub(crate) struct Assign<T>
where
    T: AtomicallyUpdate<T>,
{
    pub(crate) value: T::AtomicTracker,
}

impl<T> Aggregator for Assign<T>
where
    T: Number,
{
    type InitConfig = ();
    type PreComputedValue = T;

    fn create(_init: &()) -> Self {
        Self {
            value: T::new_atomic_tracker(T::default()),
        }
    }

    fn update(&self, value: T) {
        self.value.store(value)
    }

    fn clone_and_reset(&self, _: &()) -> Self {
        Self {
            value: T::new_atomic_tracker(self.value.get_and_reset_value()),
        }
    }
}

/// Summarizes a set of measurements as the last one made.
pub(crate) struct LastValue<T: Number> {
    value_map: ValueMap<Assign<T>>,
    start: Mutex<SystemTime>,
}

impl<T: Number> LastValue<T> {
    pub(crate) fn new() -> Self {
        LastValue {
            value_map: ValueMap::new(()),
            start: Mutex::new(SystemTime::now()),
        }
    }

    pub(crate) fn measure(&self, measurement: T, attrs: &[KeyValue]) {
        // The argument index is not applicable to LastValue.
        self.value_map.measure(measurement, attrs);
    }

    pub(crate) fn compute_aggregation_delta(&self, dest: &mut Vec<DataPoint<T>>) {
        let t = SystemTime::now();
        let prev_start = self
            .start
            .lock()
            .map(|mut start| replace(start.deref_mut(), t))
            .unwrap_or(t);
        self.value_map
            .collect_and_reset(dest, |attributes, aggr| DataPoint {
                attributes,
                start_time: Some(prev_start),
                time: Some(t),
                value: aggr.value.get_value(),
                exemplars: vec![],
            });
    }

    pub(crate) fn compute_aggregation_cumulative(&self, dest: &mut Vec<DataPoint<T>>) {
        let t = SystemTime::now();
        let prev_start = self.start.lock().map(|start| *start).unwrap_or(t);
        self.value_map
            .collect_readonly(dest, |attributes, aggr| DataPoint {
                attributes,
                start_time: Some(prev_start),
                time: Some(t),
                value: aggr.value.get_value(),
                exemplars: vec![],
            });
    }
}