Skip to content
Merged
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
- Add the internal `_performance_issues_spans` field to control perf issue detection. ([#4652](https://github.com/getsentry/relay/pull/4652))
- Add `/v1/traces` (without a trailing slash) as a spec-compliant alternative for our OTLP traces endpoint. ([#4655](https://github.com/getsentry/relay/pull/4655))
- Improve handling of failed Redis connections. ([#4657](https://github.com/getsentry/relay/pull/4657))
- Expose new metrics from the async pool. ([#4658](https://github.com/getsentry/relay/pull/4658))

## 25.3.0

Expand Down
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion relay-server/src/services/autoscaling.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ impl Service for AutoscalingMetricService {
}
)
.collect();
let worker_pool_utilization = self.async_pool.metrics().utilization() as u8;
let worker_pool_utilization = self.async_pool.metrics().utilization();
let runtime_utilization = self.runtime_utilization();

sender.send(AutoscalingData {
Expand Down
6 changes: 5 additions & 1 deletion relay-server/src/services/stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,11 @@ impl RelayStats {
pool = async_pool.name()
);
metric!(
gauge(RelayGauges::AsyncPoolUtilization) = metrics.utilization() as f64,
gauge(RelayGauges::AsyncPoolUtilization) = metrics.utilization() as u64,
pool = async_pool.name()
);
metric!(
gauge(RelayGauges::AsyncPoolActivity) = metrics.activity() as u64,
pool = async_pool.name()
);
metric!(
Expand Down
13 changes: 11 additions & 2 deletions relay-server/src/statsd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,20 @@ pub enum RelayGauges {
AsyncPoolQueueSize,
/// Tracks the utilization of the async pool.
///
/// The utilization is a value between 0.0 and 100.0 which determines how busy is the pool
/// w.r.t. to its provisioned capacity.
/// The utilization is a value between 0.0 and 100.0 which determines how busy the pool is doing
/// CPU-bound work.
///
/// This metric is tagged with:
/// - `pool`: the name of the pool.
AsyncPoolUtilization,
/// Tracks the activity of the async pool.
///
/// The activity is a value between 0.0 and 100.0 which determines how busy is the pool
/// w.r.t. to its provisioned capacity.
///
/// This metric is tagged with:
/// - `pool`: the name of the pool.
AsyncPoolActivity,
/// The state of Relay with respect to the upstream connection.
/// Possible values are `0` for normal operations and `1` for a network outage.
NetworkOutage,
Expand Down Expand Up @@ -70,6 +78,7 @@ impl GaugeMetric for RelayGauges {
match self {
RelayGauges::AsyncPoolQueueSize => "async_pool.queue_size",
RelayGauges::AsyncPoolUtilization => "async_pool.utilization",
RelayGauges::AsyncPoolActivity => "async_pool.activity",
RelayGauges::NetworkOutage => "upstream.network_outage",
RelayGauges::BufferStackCount => "buffer.stack_count",
RelayGauges::BufferDiskUsed => "buffer.disk_used",
Expand Down
2 changes: 2 additions & 0 deletions relay-system/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@
#![allow(clippy::derive_partial_eq_without_eq)]

mod controller;
mod monitor;
mod runtime;
mod service;
mod statsd;

pub use self::controller::*;
pub use self::monitor::*;
pub use self::runtime::*;
pub use self::service::*;
Original file line number Diff line number Diff line change
Expand Up @@ -9,30 +9,24 @@ use tokio::time::{Duration, Instant};
const UTILIZATION_UPDATE_THRESHOLD: Duration = Duration::from_secs(5);

pin_project_lite::pin_project! {
/// A service monitor tracks service metrics.
pub struct ServiceMonitor<F> {
/// A future that tracks metrics.
pub struct MonitoredFuture<F> {
#[pin]
inner: F,

metrics: Arc<RawMetrics>,

last_utilization_update: Instant,
last_utilization_duration_ns: u64,
total_duration_ns: u64
}
}

impl<F> ServiceMonitor<F> {
/// Wraps a service future with a monitor.
impl<F> MonitoredFuture<F> {
/// Wraps a future with the [`MonitoredFuture`].
pub fn wrap(inner: F) -> Self {
Self {
inner,
metrics: Arc::new(RawMetrics {
poll_count: AtomicU64::new(0),
total_duration_ns: AtomicU64::new(0),
utilization: AtomicU8::new(0),
}),
metrics: Arc::new(RawMetrics::default()),
last_utilization_update: Instant::now(),
last_utilization_duration_ns: 0,
total_duration_ns: 0,
}
}

Expand All @@ -42,7 +36,7 @@ impl<F> ServiceMonitor<F> {
}
}

impl<F> Future for ServiceMonitor<F>
impl<F> Future for MonitoredFuture<F>
where
F: Future,
{
Expand All @@ -60,35 +54,32 @@ where
let poll_duration = poll_end - poll_start;
let poll_duration_ns = poll_duration.as_nanos().try_into().unwrap_or(u64::MAX);

let previous_total_duration = this
.metrics
this.metrics
.total_duration_ns
.fetch_add(poll_duration_ns, Ordering::Relaxed);
let total_duration_ns = previous_total_duration + poll_duration_ns;
*this.total_duration_ns += poll_duration_ns;

let utilization_duration = poll_end - *this.last_utilization_update;
if utilization_duration >= UTILIZATION_UPDATE_THRESHOLD {
// Time spent the service was busy since the last utilization calculation.
let busy = total_duration_ns - *this.last_utilization_duration_ns;

// The maximum possible time spent busy is the total time between the last measurement
// and the current measurement. We can extract a percentage from this.
let percentage = (busy * 100).div_ceil(utilization_duration.as_nanos().max(1) as u64);
let percentage = (*this.total_duration_ns * 100)
.div_ceil(utilization_duration.as_nanos().max(1) as u64);
this.metrics
.utilization
.store(percentage.min(100) as u8, Ordering::Relaxed);

*this.last_utilization_duration_ns = total_duration_ns;
*this.total_duration_ns = 0;
*this.last_utilization_update = poll_end;
}

ret
}
}

/// The raw metrics extracted from a [`ServiceMonitor`].
/// The raw metrics extracted from a [`MonitoredFuture`].
///
/// All access outside the [`ServiceMonitor`] must be *read* only.
/// All access outside the [`MonitoredFuture`] must be *read* only.
#[derive(Debug)]
pub struct RawMetrics {
/// Amount of times the service was polled.
Expand All @@ -99,13 +90,23 @@ pub struct RawMetrics {
pub utilization: AtomicU8,
}

impl Default for RawMetrics {
fn default() -> Self {
Self {
poll_count: AtomicU64::new(0),
total_duration_ns: AtomicU64::new(0),
utilization: AtomicU8::new(0),
}
}
}

#[cfg(test)]
mod tests {
use super::*;

#[tokio::test(start_paused = true)]
async fn test_monitor() {
let mut monitor = ServiceMonitor::wrap(Box::pin(async {
let mut monitor = MonitoredFuture::wrap(Box::pin(async {
loop {
tokio::time::advance(Duration::from_millis(500)).await;
}
Expand Down
1 change: 0 additions & 1 deletion relay-system/src/service/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use tokio::time::MissedTickBehavior;
use crate::statsd::SystemGauges;
use crate::{spawn, TaskId};

mod monitor;
mod registry;
mod status;

Expand Down
10 changes: 5 additions & 5 deletions relay-system/src/service/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ use std::sync::atomic::Ordering;
use std::sync::{Arc, Mutex, PoisonError};
use std::time::Duration;

use crate::service::monitor::{RawMetrics, ServiceMonitor};
use crate::monitor::MonitoredFuture;

use crate::service::status::{ServiceJoinHandle, ServiceStatusJoinHandle};
use crate::{ServiceObj, TaskId};
use crate::{RawMetrics, ServiceObj, TaskId};

/// A point in time snapshot of all started services and their [`ServiceMetrics`].
pub struct ServicesMetrics(BTreeMap<ServiceId, ServiceMetrics>);
Expand Down Expand Up @@ -35,7 +35,7 @@ pub struct ServiceMetrics {
///
/// This value is a percentage in the range from `[0-100]` and recomputed periodically.
///
/// The measure is only updated when the when the service is polled. A service which
/// The measure is only updated when the service is polled. A service which
/// spends a long time idle may not have this measure updated for a long time.
pub utilization: u8,
}
Expand Down Expand Up @@ -108,7 +108,7 @@ impl Inner {
// lower priority tasks. We want to prioritize service backlogs over creating more work
// for these services.
let future = tokio::task::unconstrained(service.future);
let future = ServiceMonitor::wrap(future);
let future = MonitoredFuture::wrap(future);
let metrics = Arc::clone(future.metrics());

let task_handle = crate::runtime::spawn_in(handle, task_id, future);
Expand Down Expand Up @@ -189,7 +189,7 @@ impl ServiceGroup {
/// Collection of metadata the registry tracks per service instance.
#[derive(Debug)]
struct ServiceInstance {
/// The per service group unique id for this instance.
/// The per-service group unique id for this instance.
instance_id: u32,
/// A raw handle for all metrics tracked for this instance.
///
Expand Down
2 changes: 1 addition & 1 deletion relay-threading/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ futures = { workspace = true }
tokio = { workspace = true }
pin-project-lite = { workspace = true }

relay-statsd = { workspace = true }
relay-system = { workspace = true }

[dev-dependencies]
criterion = { workspace = true, features = ["async_tokio"] }
Expand Down
34 changes: 32 additions & 2 deletions relay-threading/src/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use relay_system::RawMetrics;
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;

Expand All @@ -13,6 +14,8 @@ pub(crate) struct ThreadMetrics {
///
/// This number will monotonically grow if not reset.
pub(crate) finished_tasks: AtomicU64,
/// The raw metrics collected by the timed future.
pub(crate) raw_metrics: Arc<RawMetrics>,
}

impl ThreadMetrics {
Expand Down Expand Up @@ -53,13 +56,40 @@ impl AsyncPoolMetrics<'_> {
}

/// Returns the utilization metric for the pool.
pub fn utilization(&self) -> f32 {
///
/// The utilization is measured as the amount of busy work performed by each thread when polling
/// the futures.
///
/// A utilization of 100% indicates that the pool has been doing CPU-bound work for the duration
/// of the measurement.
/// A utilization of 0% indicates that the pool didn't do any CPU-bound work for the duration
/// of the measurement.
///
/// Note that this metric is collected and updated for each thread when the main future is polled,
/// thus if no work is being done, it will not be updated.
pub fn utilization(&self) -> u8 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe call that cpu_utilization and expose a total_utilization which does max(activity, cpu_utilization) which can be used in the auto scaler thingy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am thinking of the edge case where we have 100% activity since we are doing lots of I/O and 0% max utilization (assuming no cpu bound work is being done). Do we really want to scale in this case? Scaling might not help.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Up to you, without thinking too much about I would have said yes, because scaling is probably the safer option.

self.threads_metrics
.iter()
.map(|m| m.raw_metrics.utilization.load(Ordering::Relaxed))
.max()
.unwrap_or(100)
}

/// Returns the activity metric for the pool.
///
/// The activity is measure as the amount of active tasks in the pool versus the maximum amount
/// of tasks that the pool can have active at the same time.
///
/// An activity of 100% indicates that the pool is driving the maximum number of tasks that it
/// can.
/// An activity of 0% indicates that the pool is not driving any tasks.
pub fn activity(&self) -> u8 {
let total_polled_futures: u64 = self
.threads_metrics
.iter()
.map(|m| m.active_tasks.load(Ordering::Relaxed))
.sum();

(total_polled_futures as f32 / self.max_tasks as f32).clamp(0.0, 1.0) * 100.0
(total_polled_futures as f32 / self.max_tasks as f32).clamp(0.0, 1.0) as u8 * 100
}
}
25 changes: 13 additions & 12 deletions relay-threading/src/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,13 @@ use std::io;
use std::panic::AssertUnwindSafe;
use std::sync::Arc;

use futures::future::BoxFuture;
use futures::FutureExt;

use crate::builder::AsyncPoolBuilder;
use crate::metrics::AsyncPoolMetrics;
use crate::multiplexing::Multiplexed;
use crate::{PanicHandler, ThreadMetrics};
use futures::future::BoxFuture;
use futures::FutureExt;
use relay_system::MonitoredFuture;

/// Default name of the pool.
const DEFAULT_POOL_NAME: &str = "unnamed";
Expand Down Expand Up @@ -64,23 +64,24 @@ where

for thread_id in 0..builder.num_threads {
let rx = rx.clone();
let thread_name: Option<String> = builder.thread_name.as_mut().map(|f| f(thread_id));

let thread_name: Option<String> = builder.thread_name.as_mut().map(|f| f(thread_id));
let metrics = Arc::new(ThreadMetrics::default());
let task = MonitoredFuture::wrap(Multiplexed::new(
pool_name,
builder.max_concurrency,
rx.into_stream(),
builder.task_panic_handler.clone(),
metrics.clone(),
));

let thread = Thread {
id: thread_id,
max_concurrency: builder.max_concurrency,
name: thread_name.clone(),
runtime: builder.runtime.clone(),
panic_handler: builder.thread_panic_handler.clone(),
task: Multiplexed::new(
pool_name,
builder.max_concurrency,
rx.into_stream(),
builder.task_panic_handler.clone(),
metrics.clone(),
)
.boxed(),
task: task.boxed(),
};

threads_metrics.push(metrics);
Expand Down