Skip to content

Commit 9b45650

Browse files
committed
monitor tests, more docs
1 parent 168ecb2 commit 9b45650

File tree

5 files changed

+67
-14
lines changed

5 files changed

+67
-14
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,11 @@
11
# Changelog
22

3+
## Unreleased
4+
5+
**Internal**:
6+
7+
- Track an utilization metric for internal services. ([#4501](https://github.com/getsentry/relay/pull/4501))
8+
39
## 25.2.0
410

511
- Allow log ingestion behind a flag, only for internal use currently. ([#4471](https://github.com/getsentry/relay/pull/4471))

relay-system/src/runtime/handle.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,10 +14,10 @@ struct HandleInner {
1414
tokio_cb_metrics: Arc<TokioCallbackMetrics>,
1515
}
1616

17-
/// Handle to the [`Runtime`].
17+
/// Handle to the [`Runtime`](crate::Runtime).
1818
///
1919
/// The handle is internally reference-counted and can be freely cloned.
20-
/// A handle can be obtained using the [`Runtime::handle`] method.
20+
/// A handle can be obtained using the [`Runtime::handle`](crate::Runtime::handle) method.
2121
#[derive(Debug, Clone)]
2222
pub struct Handle {
2323
inner: Arc<HandleInner>,

relay-system/src/runtime/spawn.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ where
3838

3939
/// Spawns a new asynchronous task in a specific runtime, returning a [`JoinHandle`] for it.
4040
///
41-
/// This is in instrumented spawn variant of Tokio's [`Handle::spawn`].
41+
/// This is in instrumented spawn variant of Tokio's [`Handle::spawn`](crate::Handle::spawn).
4242
#[allow(clippy::disallowed_methods)]
4343
pub fn spawn_in<F>(
4444
handle: &tokio::runtime::Handle,
@@ -166,8 +166,8 @@ mod tests {
166166
#[cfg(windows)]
167167
assert_debug_snapshot!(captures, @r###"
168168
[
169-
"runtime.task.spawn.created:1|c|#id:relay-system\\src\\runtime\\spawn.rs:124,file:relay-system\\src\\runtime\\spawn.rs,line:124",
170-
"runtime.task.spawn.terminated:1|c|#id:relay-system\\src\\runtime\\spawn.rs:124,file:relay-system\\src\\runtime\\spawn.rs,line:124",
169+
"runtime.task.spawn.created:1|c|#id:relay-system\\src\\runtime\\spawn.rs:124,file:relay-system\\src\\runtime\\spawn.rs,line:155",
170+
"runtime.task.spawn.terminated:1|c|#id:relay-system\\src\\runtime\\spawn.rs:124,file:relay-system\\src\\runtime\\spawn.rs,line:155",
171171
]
172172
"###);
173173
}

relay-system/src/service/monitor.rs

Lines changed: 51 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,14 @@ use std::pin::Pin;
33
use std::sync::atomic::{AtomicU64, AtomicU8, Ordering};
44
use std::sync::Arc;
55
use std::task::{Context, Poll};
6-
use std::time::{Duration, Instant};
7-
8-
use crate::service::registry::ServiceId;
6+
use tokio::time::{Duration, Instant};
97

108
/// Minimum interval when utilization is recalculated.
119
const UTILIZATION_UPDATE_THRESHOLD: Duration = Duration::from_secs(5);
1210

1311
pin_project_lite::pin_project! {
12+
/// A service monitor tracks service metrics.
1413
pub struct ServiceMonitor<F> {
15-
id: ServiceId,
1614
#[pin]
1715
inner: F,
1816

@@ -24,9 +22,9 @@ pin_project_lite::pin_project! {
2422
}
2523

2624
impl<F> ServiceMonitor<F> {
27-
pub fn wrap(id: ServiceId, inner: F) -> Self {
25+
/// Wraps a service future with a monitor.
26+
pub fn wrap(inner: F) -> Self {
2827
Self {
29-
id,
3028
inner,
3129
metrics: Arc::new(RawMetrics {
3230
poll_count: AtomicU64::new(0),
@@ -38,6 +36,7 @@ impl<F> ServiceMonitor<F> {
3836
}
3937
}
4038

39+
/// Provides access to the raw metrics tracked in this monitor.
4140
pub fn metrics(&self) -> &Arc<RawMetrics> {
4241
&self.metrics
4342
}
@@ -87,9 +86,55 @@ where
8786
}
8887
}
8988

89+
/// The raw metrics extracted from a [`ServiceMonitor`].
90+
///
91+
/// All access outside the [`ServiceMonitor`] must be *read* only.
9092
#[derive(Debug)]
9193
pub struct RawMetrics {
94+
/// Amount of times the service was polled.
9295
pub poll_count: AtomicU64,
96+
/// The total time the service spent in its poll function.
9397
pub total_duration_ns: AtomicU64,
98+
/// Estimated utilization percentage `[0-100]`
9499
pub utilization: AtomicU8,
95100
}
101+
102+
#[cfg(test)]
103+
mod tests {
104+
use super::*;
105+
106+
#[tokio::test(start_paused = true)]
107+
async fn test_monitor() {
108+
let mut monitor = ServiceMonitor::wrap(Box::pin(async {
109+
loop {
110+
tokio::time::advance(Duration::from_millis(500)).await;
111+
}
112+
}));
113+
let metrics = Arc::clone(monitor.metrics());
114+
115+
assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 0);
116+
let _ = futures::poll!(&mut monitor);
117+
assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 1);
118+
let _ = futures::poll!(&mut monitor);
119+
assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 2);
120+
let _ = futures::poll!(&mut monitor);
121+
assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 3);
122+
123+
assert_eq!(metrics.utilization.load(Ordering::Relaxed), 0);
124+
assert_eq!(
125+
metrics.total_duration_ns.load(Ordering::Relaxed),
126+
1500000000
127+
);
128+
129+
// Advance time just enough to perfectly hit the update threshold.
130+
tokio::time::advance(UTILIZATION_UPDATE_THRESHOLD - Duration::from_secs(2)).await;
131+
132+
let _ = futures::poll!(&mut monitor);
133+
assert_eq!(metrics.poll_count.load(Ordering::Relaxed), 4);
134+
assert_eq!(metrics.utilization.load(Ordering::Relaxed), 40);
135+
assert_eq!(
136+
metrics.total_duration_ns.load(Ordering::Relaxed),
137+
2000000000
138+
);
139+
}
140+
}

relay-system/src/service/registry.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use crate::{ServiceObj, TaskId};
1212
pub struct ServicesMetrics(BTreeMap<ServiceId, ServiceMetrics>);
1313

1414
impl ServicesMetrics {
15-
/// Returns an iterator of all service identifiers and their [`Metrics`].
15+
/// Returns an iterator of all service identifiers and their [`ServiceMetrics`].
1616
pub fn iter(&self) -> impl Iterator<Item = (ServiceId, ServiceMetrics)> + '_ {
1717
self.0.iter().map(|(id, metrics)| (*id, *metrics))
1818
}
@@ -104,13 +104,16 @@ impl Inner {
104104
let task_id = TaskId::from(&service);
105105
let group = self.services.entry(task_id).or_default();
106106

107+
// Cleanup group, evicting all terminated services, while we're at it.
108+
group.instances.retain(|s| !s.handle.is_finished());
109+
107110
let id = ServiceId {
108111
task: task_id,
109112
instance_id: group.next_instance_id,
110113
};
111114
group.next_instance_id += 1;
112115

113-
let future = ServiceMonitor::wrap(id, service.future);
116+
let future = ServiceMonitor::wrap(service.future);
114117
let metrics = Arc::clone(future.metrics());
115118

116119
let jh = crate::runtime::spawn_in(handle, task_id, future);
@@ -158,6 +161,5 @@ struct ServiceGroup {
158161
struct Service {
159162
instance_id: u32,
160163
metrics: Arc<RawMetrics>,
161-
#[expect(unused, reason = "not yet implemented")]
162164
handle: ServiceStatusJoinHandle,
163165
}

0 commit comments

Comments
 (0)