Skip to content

Commit aaf19e7

Browse files
committed
tests, docs, utilization value
1 parent 2c756ce commit aaf19e7

File tree

13 files changed

+457
-220
lines changed

13 files changed

+457
-220
lines changed

relay-server/src/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -290,7 +290,7 @@ impl ServiceState {
290290

291291
services.start(RelayStats::new(
292292
config.clone(),
293-
handle.metrics(),
293+
handle.clone(),
294294
upstream_relay.clone(),
295295
#[cfg(feature = "processing")]
296296
redis_pools.clone(),

relay-server/src/services/stats.rs

Lines changed: 41 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,16 @@ use relay_redis::AsyncRedisClient;
88
#[cfg(feature = "processing")]
99
use relay_redis::{RedisPool, RedisPools, Stats};
1010
use relay_statsd::metric;
11-
use relay_system::{Addr, RuntimeMetrics, Service};
11+
use relay_system::{Addr, Handle, RuntimeMetrics, Service};
1212
use tokio::time::interval;
1313

1414
/// Relay Stats Service.
1515
///
1616
/// Service which collects stats periodically and emits them via statsd.
1717
pub struct RelayStats {
1818
config: Arc<Config>,
19-
runtime: RuntimeMetrics,
19+
runtime: Handle,
20+
rt_metrics: RuntimeMetrics,
2021
upstream_relay: Addr<UpstreamRelay>,
2122
#[cfg(feature = "processing")]
2223
redis_pools: Option<RedisPools>,
@@ -25,88 +26,106 @@ pub struct RelayStats {
2526
impl RelayStats {
2627
pub fn new(
2728
config: Arc<Config>,
28-
runtime: RuntimeMetrics,
29+
runtime: Handle,
2930
upstream_relay: Addr<UpstreamRelay>,
3031
#[cfg(feature = "processing")] redis_pools: Option<RedisPools>,
3132
) -> Self {
3233
Self {
3334
config,
3435
upstream_relay,
36+
rt_metrics: runtime.metrics(),
3537
runtime,
3638
#[cfg(feature = "processing")]
3739
redis_pools,
3840
}
3941
}
4042

43+
async fn service_metrics(&self) {
44+
for (service, metrics) in self.runtime.current_services_metrics().iter() {
45+
metric!(
46+
gauge(RelayGauges::ServiceUtilization) = metrics.utilization as u64,
47+
service = service.name(),
48+
instance_id = &service.instance_id().to_string(),
49+
);
50+
}
51+
}
52+
4153
async fn tokio_metrics(&self) {
42-
metric!(gauge(RuntimeGauges::NumIdleThreads) = self.runtime.num_idle_threads() as u64);
43-
metric!(gauge(RuntimeGauges::NumAliveTasks) = self.runtime.num_alive_tasks() as u64);
54+
metric!(gauge(RuntimeGauges::NumIdleThreads) = self.rt_metrics.num_idle_threads() as u64);
55+
metric!(gauge(RuntimeGauges::NumAliveTasks) = self.rt_metrics.num_alive_tasks() as u64);
4456
metric!(
45-
gauge(RuntimeGauges::BlockingQueueDepth) = self.runtime.blocking_queue_depth() as u64
57+
gauge(RuntimeGauges::BlockingQueueDepth) =
58+
self.rt_metrics.blocking_queue_depth() as u64
4659
);
4760
metric!(
48-
gauge(RuntimeGauges::NumBlockingThreads) = self.runtime.num_blocking_threads() as u64
61+
gauge(RuntimeGauges::NumBlockingThreads) =
62+
self.rt_metrics.num_blocking_threads() as u64
4963
);
5064
metric!(
5165
gauge(RuntimeGauges::NumIdleBlockingThreads) =
52-
self.runtime.num_idle_blocking_threads() as u64
66+
self.rt_metrics.num_idle_blocking_threads() as u64
5367
);
5468

5569
metric!(
5670
counter(RuntimeCounters::BudgetForcedYieldCount) +=
57-
self.runtime.budget_forced_yield_count()
71+
self.rt_metrics.budget_forced_yield_count()
5872
);
5973

60-
metric!(gauge(RuntimeGauges::NumWorkers) = self.runtime.num_workers() as u64);
61-
for worker in 0..self.runtime.num_workers() {
74+
metric!(gauge(RuntimeGauges::NumWorkers) = self.rt_metrics.num_workers() as u64);
75+
for worker in 0..self.rt_metrics.num_workers() {
6276
let worker_name = worker.to_string();
6377

6478
metric!(
6579
gauge(RuntimeGauges::WorkerLocalQueueDepth) =
66-
self.runtime.worker_local_queue_depth(worker) as u64,
80+
self.rt_metrics.worker_local_queue_depth(worker) as u64,
6781
worker = &worker_name,
6882
);
6983
metric!(
7084
gauge(RuntimeGauges::WorkerMeanPollTime) =
71-
self.runtime.worker_mean_poll_time(worker).as_secs_f64(),
85+
self.rt_metrics.worker_mean_poll_time(worker).as_secs_f64(),
7286
worker = &worker_name,
7387
);
7488

7589
metric!(
7690
counter(RuntimeCounters::WorkerLocalScheduleCount) +=
77-
self.runtime.worker_local_schedule_count(worker),
91+
self.rt_metrics.worker_local_schedule_count(worker),
7892
worker = &worker_name,
7993
);
8094
metric!(
81-
counter(RuntimeCounters::WorkerNoopCount) += self.runtime.worker_noop_count(worker),
95+
counter(RuntimeCounters::WorkerNoopCount) +=
96+
self.rt_metrics.worker_noop_count(worker),
8297
worker = &worker_name,
8398
);
8499
metric!(
85100
counter(RuntimeCounters::WorkerOverflowCount) +=
86-
self.runtime.worker_overflow_count(worker),
101+
self.rt_metrics.worker_overflow_count(worker),
87102
worker = &worker_name,
88103
);
89104
metric!(
90-
counter(RuntimeCounters::WorkerParkCount) += self.runtime.worker_park_count(worker),
105+
counter(RuntimeCounters::WorkerParkCount) +=
106+
self.rt_metrics.worker_park_count(worker),
91107
worker = &worker_name,
92108
);
93109
metric!(
94-
counter(RuntimeCounters::WorkerPollCount) += self.runtime.worker_poll_count(worker),
110+
counter(RuntimeCounters::WorkerPollCount) +=
111+
self.rt_metrics.worker_poll_count(worker),
95112
worker = &worker_name,
96113
);
97114
metric!(
98115
counter(RuntimeCounters::WorkerStealCount) +=
99-
self.runtime.worker_steal_count(worker),
116+
self.rt_metrics.worker_steal_count(worker),
100117
worker = &worker_name,
101118
);
102119
metric!(
103120
counter(RuntimeCounters::WorkerStealOperations) +=
104-
self.runtime.worker_steal_operations(worker),
121+
self.rt_metrics.worker_steal_operations(worker),
105122
worker = &worker_name,
106123
);
107124
metric!(
108125
counter(RuntimeCounters::WorkerTotalBusyDuration) +=
109-
self.runtime.worker_total_busy_duration(worker).as_millis() as u64,
126+
self.rt_metrics
127+
.worker_total_busy_duration(worker)
128+
.as_millis() as u64,
110129
worker = &worker_name,
111130
);
112131
}
@@ -171,6 +190,7 @@ impl Service for RelayStats {
171190
loop {
172191
let _ = tokio::join!(
173192
self.upstream_status(),
193+
self.service_metrics(),
174194
self.tokio_metrics(),
175195
self.redis_pools(),
176196
);

relay-server/src/statsd.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ pub enum RelayGauges {
4242
/// - `namespace`: the metric namespace.
4343
#[cfg(feature = "processing")]
4444
MetricDelayMax,
45+
/// Estimated percentage [0-100] of how busy Relay's internal services are.
46+
///
47+
/// This metric is tagged with:
48+
/// - `service`: the service name.
49+
/// - `instance_id`: a for the service name unique identifier for the running service
50+
ServiceUtilization,
4551
}
4652

4753
impl GaugeMetric for RelayGauges {
@@ -63,6 +69,7 @@ impl GaugeMetric for RelayGauges {
6369
RelayGauges::ServerActiveConnections => "server.http.connections",
6470
#[cfg(feature = "processing")]
6571
RelayGauges::MetricDelayMax => "metrics.delay.max",
72+
RelayGauges::ServiceUtilization => "service.utilization",
6673
}
6774
}
6875
}

relay-system/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,3 +28,4 @@ pin-project-lite = { workspace = true }
2828
insta = { workspace = true }
2929
relay-statsd = { workspace = true, features = ["test"] }
3030
tokio = { workspace = true, features = ["test-util"] }
31+
futures = { workspace = true, features = ["async-await"] }

relay-system/src/runtime/handle.rs

Lines changed: 101 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
use std::sync::Arc;
2+
3+
use futures::stream::FuturesUnordered;
4+
use futures::StreamExt as _;
5+
6+
use crate::runtime::metrics::TokioCallbackMetrics;
7+
use crate::{RuntimeMetrics, ServiceJoinHandle, ServiceRegistry, ServiceSpawn, ServicesMetrics};
8+
9+
#[derive(Debug)]
10+
struct HandleInner {
11+
name: &'static str,
12+
services: ServiceRegistry,
13+
tokio: tokio::runtime::Handle,
14+
tokio_cb_metrics: Arc<TokioCallbackMetrics>,
15+
}
16+
17+
/// Handle to the [`Runtime`].
18+
///
19+
/// The handle is internally reference-counted and can be freely cloned.
20+
/// A handle can be obtained using the [`Runtime::handle`] method.
21+
#[derive(Debug, Clone)]
22+
pub struct Handle {
23+
inner: Arc<HandleInner>,
24+
}
25+
26+
impl Handle {
27+
pub(crate) fn new(
28+
name: &'static str,
29+
tokio: tokio::runtime::Handle,
30+
tokio_cb_metrics: Arc<TokioCallbackMetrics>,
31+
) -> Self {
32+
Self {
33+
inner: Arc::new(HandleInner {
34+
name,
35+
services: ServiceRegistry::new(),
36+
tokio,
37+
tokio_cb_metrics,
38+
}),
39+
}
40+
}
41+
42+
/// Returns a new [`RuntimeMetrics`] handle for this runtime.
43+
pub fn metrics(&self) -> RuntimeMetrics {
44+
Arc::clone(&self.inner.tokio_cb_metrics)
45+
.into_metrics(self.inner.name, self.inner.tokio.metrics())
46+
}
47+
48+
/// Returns all service metrics of all currently running services.
49+
///
50+
/// Unlike [`Self::metrics`], this is not a handle to the metrics.
51+
pub fn current_services_metrics(&self) -> ServicesMetrics {
52+
self.inner.services.metrics()
53+
}
54+
55+
/// Returns a new unique [`ServiceSet`] to spawn services and await their termination.
56+
pub fn service_set(&self) -> ServiceSet {
57+
ServiceSet {
58+
handle: Arc::clone(&self.inner),
59+
services: Default::default(),
60+
}
61+
}
62+
}
63+
64+
impl ServiceSpawn for Handle {
65+
fn start_obj(&self, service: crate::ServiceObj) {
66+
self.inner.services.start_in(&self.inner.tokio, service);
67+
}
68+
}
69+
70+
/// Spawns and keeps track of running services.
71+
///
72+
/// A [`ServiceSet`] can be awaited for the completion of all started services
73+
/// on this [`ServiceSet`].
74+
///
75+
/// Every service started on this [`ServiceSet`] is attached to the [`Handle`]
76+
/// this set was created from, using [`Handle::service_set`].
77+
pub struct ServiceSet {
78+
handle: Arc<HandleInner>,
79+
services: FuturesUnordered<ServiceJoinHandle>,
80+
}
81+
82+
impl ServiceSet {
83+
/// Awaits until all services have finished.
84+
///
85+
/// Panics if one of the spawned services has panicked.
86+
pub async fn join(&mut self) {
87+
while let Some(res) = self.services.next().await {
88+
if let Some(panic) = res.err().and_then(|e| e.into_panic()) {
89+
// Re-trigger panic to terminate the process:
90+
std::panic::resume_unwind(panic);
91+
}
92+
}
93+
}
94+
}
95+
96+
impl ServiceSpawn for ServiceSet {
97+
fn start_obj(&self, service: crate::ServiceObj) {
98+
let handle = self.handle.services.start_in(&self.handle.tokio, service);
99+
self.services.push(handle);
100+
}
101+
}

relay-system/src/runtime/metrics.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,7 @@ mod tests {
242242
fn test_metric_diff() {
243243
let rt = crate::Runtime::builder("test").worker_threads(1).build();
244244

245-
let metrics = rt.metrics();
245+
let metrics = rt.handle().metrics();
246246

247247
rt.block_on(async move {
248248
let tokio_metrics = tokio::runtime::Handle::current().metrics();

relay-system/src/runtime/mod.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
mod handle;
12
mod metrics;
23
#[expect(
34
clippy::module_inception,
@@ -6,6 +7,7 @@ mod metrics;
67
mod runtime;
78
mod spawn;
89

10+
pub use self::handle::{Handle, ServiceSet};
911
pub use self::metrics::RuntimeMetrics;
10-
pub use self::runtime::{Builder, Handle, Runtime, ServiceSet};
12+
pub use self::runtime::{Builder, Runtime};
1113
pub use self::spawn::{spawn, spawn_in, TaskId};

0 commit comments

Comments
 (0)