1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208
use std::collections::VecDeque;
use std::sync::atomic::AtomicU64;
use std::sync::Arc;
use std::time::Duration;
use indicatif::style::ProgressStyle;
use indicatif::MultiProgress;
use indicatif::ProgressBar;
use indicatif::ProgressDrawTarget;
use indicatif::ProgressState;
use tracing_core::span;
use tracing_core::Subscriber;
use tracing_subscriber::layer;
use tracing_subscriber::registry::LookupSpan;
use crate::IndicatifSpanContext;
pub(crate) struct ProgressBarManager {
pub(crate) mp: MultiProgress,
active_progress_bars: u64,
max_progress_bars: u64,
// This is used in the footer progress bar and tracks the actual number of pending progress
// bars.
pending_progress_bars: Arc<AtomicU64>,
// The `.len()` of this may differ from `pending_progress_bars`. If a span closes before its
// progress bar is ever un-hidden, we decrement `pending_progress_bars` but won't clean the
// span entry up from this `VecDeque` for performance reasons. Instead, whenever we do un-hide
// a progress bar, we'll "garbage collect" closed spans from this then.
pending_spans: VecDeque<span::Id>,
// If this is `None`, a footer will never be shown.
footer_pb: Option<ProgressBar>,
}
impl ProgressBarManager {
pub(crate) fn new(
max_progress_bars: u64,
footer_progress_style: Option<ProgressStyle>,
) -> Self {
let pending_progress_bars = Arc::new(AtomicU64::new(0));
Self {
mp: {
let mp = MultiProgress::new();
mp.set_draw_target(ProgressDrawTarget::stderr_with_hz(20));
mp
},
active_progress_bars: 0,
max_progress_bars,
pending_progress_bars: pending_progress_bars.clone(),
pending_spans: VecDeque::new(),
footer_pb: footer_progress_style.map(|style| {
ProgressBar::hidden().with_style(style.with_key(
"pending_progress_bars",
move |_: &ProgressState, writer: &mut dyn std::fmt::Write| {
let _ = write!(
writer,
"{}",
pending_progress_bars.load(std::sync::atomic::Ordering::SeqCst)
);
},
))
}),
}
}
fn decrement_pending_pb(&mut self) {
let prev_val = self
.pending_progress_bars
.fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
// If this span was the last one pending, clear the footer (if it was active).
if prev_val == 1 {
debug_assert!(
self.footer_pb
.as_ref()
.map(|pb| !pb.is_hidden())
.unwrap_or(true),
"footer progress bar was hidden despite there being pending progress bars"
);
if let Some(footer_pb) = self.footer_pb.as_ref() {
self.mp.set_move_cursor(false);
footer_pb.finish_and_clear();
self.mp.remove(footer_pb);
footer_pb.disable_steady_tick();
}
}
}
fn add_pending_pb(&mut self, span_id: &span::Id) {
let prev_val = self
.pending_progress_bars
.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
self.pending_spans.push_back(span_id.clone());
if prev_val == 0 {
debug_assert!(
self.footer_pb
.as_ref()
.map(|pb| pb.is_hidden())
.unwrap_or(true),
"footer progress bar was not hidden despite there being no pending progress bars"
);
// Show the footer progress bar.
if let Some(footer_pb) = self.footer_pb.take() {
let pb = self.mp.add(footer_pb);
pb.enable_steady_tick(Duration::from_millis(100));
self.mp.set_move_cursor(true);
self.footer_pb = Some(pb);
}
}
}
pub(crate) fn show_progress_bar(
&mut self,
pb_span_ctx: &mut IndicatifSpanContext,
span_id: &span::Id,
) {
if self.active_progress_bars < self.max_progress_bars {
let pb = match pb_span_ctx.parent_progress_bar {
// TODO(emersonford): fix span ordering in progress bar, because we use
// `insert_after`, we end up showing the child progress bars in reverse order.
Some(ref parent_pb) => self
.mp
.insert_after(parent_pb, pb_span_ctx.progress_bar.take().unwrap()),
None => {
if self
.footer_pb
.as_ref()
.map(|footer_pb| !footer_pb.is_hidden())
.unwrap_or(false)
{
self.mp
.insert_from_back(1, pb_span_ctx.progress_bar.take().unwrap())
} else {
self.mp.add(pb_span_ctx.progress_bar.take().unwrap())
}
}
};
self.active_progress_bars += 1;
pb.enable_steady_tick(Duration::from_millis(100));
pb_span_ctx.progress_bar = Some(pb);
} else {
self.add_pending_pb(span_id);
}
}
pub(crate) fn finish_progress_bar<S>(
&mut self,
pb_span_ctx: &mut IndicatifSpanContext,
ctx: &layer::Context<'_, S>,
) where
S: Subscriber + for<'a> LookupSpan<'a>,
{
let Some(pb) = pb_span_ctx.progress_bar.take() else {
// Span was never entered.
return;
};
// The span closed before we had a chance to show its progress bar.
if pb.is_hidden() {
self.decrement_pending_pb();
return;
}
// This span had an active/shown progress bar.
pb.finish_and_clear();
self.mp.remove(&pb);
self.active_progress_bars -= 1;
loop {
let Some(span_id) = self.pending_spans.pop_front() else {
break;
};
match ctx.span(&span_id) {
Some(next_eligible_span) => {
let mut ext = next_eligible_span.extensions_mut();
let indicatif_span_ctx = ext
.get_mut::<IndicatifSpanContext>()
.expect("No IndicatifSpanContext found; this is a bug");
// It possible `on_close` has been called on a span but it has not yet been
// removed from `ctx.span` (e.g., tracing may still be iterating through each
// layer's `on_close` method and cannot remove the span from the registry until
// it has finished `on_close` for each layer). So we may successfully fetch the
// span, despite having closed out its progress bar.
if indicatif_span_ctx.progress_bar.is_none() {
continue;
}
self.decrement_pending_pb();
self.show_progress_bar(indicatif_span_ctx, &span_id);
break;
}
None => {
// Span was closed earlier, we "garbage collect" it from the queue here.
continue;
}
}
}
}
}