Skip to content

Commit e0e55e7

Browse files
authored
feat(system): Instrument service utilization (#4501)
The goal: Generically track a utilization metric (and more) for each service spawned in Relay. To make this possible `relay-system`'s `Runtime` and `Handle` have gained the possibility to spawn services. Now instead of instantiating a `ServiceRunner` services are spawned through the runtime. The runtime has now a registry for all spawned services, which allows enumerating all currently running services as well as querying their tracked metrics. To make testing not abysmal, a `ServiceSpawn` trait was introduced, tests can spawn services on the Tokio Runtime using `TokioServiceSpawn`. This is also a great foundation to finally move the graceful shutdown handling properly into the service framework instead of relying on a global.
1 parent fddc641 commit e0e55e7

File tree

21 files changed

+942
-133
lines changed

21 files changed

+942
-133
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
- Add new `relay-threading` crate with asynchronous thread pool. ([#4500](https://github.com/getsentry/relay/pull/4500))
88
- Support span `category` inference from span attributes. ([#4509](https://github.com/getsentry/relay/pull/4509))
99

10+
**Internal**:
11+
12+
- Track an utilization metric for internal services. ([#4501](https://github.com/getsentry/relay/pull/4501))
13+
1014
## 25.2.0
1115

1216
- Allow log ingestion behind a flag, only for internal use currently. ([#4471](https://github.com/getsentry/relay/pull/4471))

relay-server/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,7 @@ criterion = { workspace = true }
141141
http = { workspace = true }
142142
insta = { workspace = true }
143143
relay-protocol = { workspace = true, features = ["test"] }
144+
relay-system = { workspace = true, features = ["test"] }
144145
relay-test = { workspace = true }
145146
similar-asserts = { workspace = true }
146147
tempfile = { workspace = true }

relay-server/src/lib.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -278,7 +278,7 @@ mod testutils;
278278
use std::sync::Arc;
279279

280280
use relay_config::Config;
281-
use relay_system::Controller;
281+
use relay_system::{Controller, ServiceSpawnExt as _};
282282

283283
use crate::service::ServiceState;
284284
use crate::services::server::HttpServer;
@@ -294,18 +294,21 @@ pub fn run(config: Config) -> anyhow::Result<()> {
294294

295295
// Creates the main runtime.
296296
let runtime = crate::service::create_runtime("main-rt", config.cpu_concurrency());
297-
let runtime_metrics = runtime.metrics();
297+
let handle = runtime.handle().clone();
298298

299299
// Run the system and block until a shutdown signal is sent to this process. Inside, start a
300300
// web server and run all relevant services. See the `actors` module documentation for more
301301
// information on all services.
302302
runtime.block_on(async {
303303
Controller::start(config.shutdown_timeout());
304-
let (state, mut runner) = ServiceState::start(runtime_metrics, config.clone()).await?;
305-
runner.start(HttpServer::new(config, state.clone())?);
304+
305+
let mut services = handle.service_set();
306+
307+
let state = ServiceState::start(&handle, &services, config.clone()).await?;
308+
services.start(HttpServer::new(config, state.clone())?);
306309

307310
tokio::select! {
308-
_ = runner.join() => {},
311+
_ = services.join() => {},
309312
// NOTE: when every service implements a shutdown listener,
310313
// awaiting on `finished` becomes unnecessary: We can simply join() and guarantee
311314
// that every service finished its main task.

relay-server/src/service.rs

Lines changed: 25 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ use relay_redis::redis::Script;
3939
use relay_redis::AsyncRedisClient;
4040
#[cfg(feature = "processing")]
4141
use relay_redis::{PooledClient, RedisError, RedisPool, RedisPools, RedisScripts};
42-
use relay_system::{channel, Addr, Service, ServiceRunner};
42+
use relay_system::{channel, Addr, Service, ServiceSpawn, ServiceSpawnExt as _};
4343

4444
/// Indicates the type of failure of the server.
4545
#[derive(Debug, thiserror::Error)]
@@ -150,12 +150,12 @@ pub struct ServiceState {
150150
impl ServiceState {
151151
/// Starts all services and returns addresses to all of them.
152152
pub async fn start(
153-
rt_metrics: relay_system::RuntimeMetrics,
153+
handle: &relay_system::Handle,
154+
services: &dyn ServiceSpawn,
154155
config: Arc<Config>,
155-
) -> Result<(Self, ServiceRunner)> {
156-
let mut runner = ServiceRunner::new();
157-
let upstream_relay = runner.start(UpstreamRelayService::new(config.clone()));
158-
let test_store = runner.start(TestStoreService::new(config.clone()));
156+
) -> Result<Self> {
157+
let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
158+
let test_store = services.start(TestStoreService::new(config.clone()));
159159

160160
#[cfg(feature = "processing")]
161161
let redis_pools = match config.redis().filter(|_| config.processing_enabled()) {
@@ -181,43 +181,44 @@ impl ServiceState {
181181
// Create an address for the `EnvelopeProcessor`, which can be injected into the
182182
// other services.
183183
let (processor, processor_rx) = channel(EnvelopeProcessorService::name());
184-
let outcome_producer = runner.start(OutcomeProducerService::create(
184+
let outcome_producer = services.start(OutcomeProducerService::create(
185185
config.clone(),
186186
upstream_relay.clone(),
187187
processor.clone(),
188188
)?);
189189
let outcome_aggregator =
190-
runner.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
190+
services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
191191

192-
let keda = runner.start(AutoscalingMetricService::new(memory_stat.clone()));
192+
let keda = services.start(AutoscalingMetricService::new(memory_stat.clone()));
193193

194194
let (global_config, global_config_rx) =
195195
GlobalConfigService::new(config.clone(), upstream_relay.clone());
196196
let global_config_handle = global_config.handle();
197197
// The global config service must start before dependant services are
198198
// started. Messages like subscription requests to the global config
199199
// service fail if the service is not running.
200-
let global_config = runner.start(global_config);
200+
let global_config = services.start(global_config);
201201

202202
let project_source = ProjectSource::start_in(
203-
&mut runner,
203+
services,
204204
Arc::clone(&config),
205205
upstream_relay.clone(),
206206
#[cfg(feature = "processing")]
207207
redis_pools.clone(),
208208
)
209209
.await;
210210
let project_cache_handle =
211-
ProjectCacheService::new(Arc::clone(&config), project_source).start_in(&mut runner);
211+
ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);
212212

213213
let aggregator = RouterService::new(
214+
handle.clone(),
214215
config.default_aggregator_config().clone(),
215216
config.secondary_aggregator_configs().clone(),
216217
Some(processor.clone().recipient()),
217218
project_cache_handle.clone(),
218219
);
219220
let aggregator_handle = aggregator.handle();
220-
let aggregator = runner.start(aggregator);
221+
let aggregator = services.start(aggregator);
221222

222223
let metric_stats = MetricStats::new(
223224
config.clone(),
@@ -238,14 +239,14 @@ impl ServiceState {
238239
outcome_aggregator.clone(),
239240
metric_outcomes.clone(),
240241
)
241-
.map(|s| runner.start(s))
242+
.map(|s| services.start(s))
242243
})
243244
.transpose()?;
244245

245246
let cogs = CogsService::new(&config);
246-
let cogs = Cogs::new(CogsServiceRecorder::new(&config, runner.start(cogs)));
247+
let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));
247248

248-
runner.start_with(
249+
services.start_with(
249250
EnvelopeProcessorService::new(
250251
create_processor_pool(&config)?,
251252
config.clone(),
@@ -276,26 +277,26 @@ impl ServiceState {
276277
processor.clone(),
277278
outcome_aggregator.clone(),
278279
test_store.clone(),
279-
&mut runner,
280+
services,
280281
);
281282

282-
let health_check = runner.start(HealthCheckService::new(
283+
let health_check = services.start(HealthCheckService::new(
283284
config.clone(),
284285
MemoryChecker::new(memory_stat.clone(), config.clone()),
285286
aggregator_handle,
286287
upstream_relay.clone(),
287288
envelope_buffer.clone(),
288289
));
289290

290-
runner.start(RelayStats::new(
291+
services.start(RelayStats::new(
291292
config.clone(),
292-
rt_metrics,
293+
handle.clone(),
293294
upstream_relay.clone(),
294295
#[cfg(feature = "processing")]
295296
redis_pools.clone(),
296297
));
297298

298-
let relay_cache = runner.start(RelayCacheService::new(
299+
let relay_cache = services.start(RelayCacheService::new(
299300
config.clone(),
300301
upstream_relay.clone(),
301302
));
@@ -320,12 +321,9 @@ impl ServiceState {
320321
registry,
321322
};
322323

323-
Ok((
324-
ServiceState {
325-
inner: Arc::new(state),
326-
},
327-
runner,
328-
))
324+
Ok(ServiceState {
325+
inner: Arc::new(state),
326+
})
329327
}
330328

331329
/// Returns a reference to the Relay configuration.

relay-server/src/services/buffer/mod.rs

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,11 @@ use ahash::RandomState;
1111
use chrono::DateTime;
1212
use chrono::Utc;
1313
use relay_config::Config;
14+
use relay_system::Receiver;
15+
use relay_system::ServiceSpawn;
16+
use relay_system::ServiceSpawnExt as _;
1417
use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
1518
use relay_system::{Controller, Shutdown};
16-
use relay_system::{Receiver, ServiceRunner};
1719
use tokio::sync::watch;
1820
use tokio::time::{timeout, Instant};
1921

@@ -99,7 +101,7 @@ impl PartitionedEnvelopeBuffer {
99101
envelope_processor: Addr<EnvelopeProcessor>,
100102
outcome_aggregator: Addr<TrackOutcome>,
101103
test_store: Addr<TestStore>,
102-
runner: &mut ServiceRunner,
104+
services: &dyn ServiceSpawn,
103105
) -> Self {
104106
let mut envelope_buffers = Vec::with_capacity(partitions.get() as usize);
105107
for partition_id in 0..partitions.get() {
@@ -115,7 +117,7 @@ impl PartitionedEnvelopeBuffer {
115117
test_store: test_store.clone(),
116118
},
117119
)
118-
.start_in(runner);
120+
.start_in(services);
119121

120122
envelope_buffers.push(envelope_buffer);
121123
}
@@ -236,10 +238,11 @@ impl EnvelopeBufferService {
236238
}
237239

238240
/// Returns both the [`Addr`] to this service, and a reference to the capacity flag.
239-
pub fn start_in(self, runner: &mut ServiceRunner) -> ObservableEnvelopeBuffer {
241+
pub fn start_in(self, services: &dyn ServiceSpawn) -> ObservableEnvelopeBuffer {
240242
let has_capacity = self.has_capacity.clone();
241243

242-
let addr = runner.start(self);
244+
let addr = services.start(self);
245+
243246
ObservableEnvelopeBuffer { addr, has_capacity }
244247
}
245248

@@ -662,6 +665,7 @@ mod tests {
662665
use relay_base_schema::project::ProjectKey;
663666
use relay_dynamic_config::GlobalConfig;
664667
use relay_quotas::DataCategory;
668+
use relay_system::TokioServiceSpawn;
665669
use std::time::Duration;
666670
use tokio::sync::mpsc;
667671
use uuid::Uuid;
@@ -726,8 +730,7 @@ mod tests {
726730

727731
service.has_capacity.store(false, Ordering::Relaxed);
728732

729-
let ObservableEnvelopeBuffer { has_capacity, .. } =
730-
service.start_in(&mut ServiceRunner::new());
733+
let ObservableEnvelopeBuffer { has_capacity, .. } = service.start_in(&TokioServiceSpawn);
731734
assert!(!has_capacity.load(Ordering::Relaxed));
732735

733736
tokio::time::advance(Duration::from_millis(100)).await;
@@ -894,7 +897,6 @@ mod tests {
894897

895898
#[tokio::test(start_paused = true)]
896899
async fn test_partitioned_buffer() {
897-
let mut runner = ServiceRunner::new();
898900
let (_global_tx, global_rx) = watch::channel(global_config::Status::Ready(Arc::new(
899901
GlobalConfig::default(),
900902
)));
@@ -930,8 +932,8 @@ mod tests {
930932
);
931933

932934
// Start both services and create partitioned buffer
933-
let observable1 = buffer1.start_in(&mut runner);
934-
let observable2 = buffer2.start_in(&mut runner);
935+
let observable1 = buffer1.start_in(&TokioServiceSpawn);
936+
let observable2 = buffer2.start_in(&TokioServiceSpawn);
935937

936938
let partitioned = PartitionedEnvelopeBuffer {
937939
buffers: Arc::new(vec![observable1, observable2]),

relay-server/src/services/metrics/router.rs

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
use relay_config::aggregator::Condition;
55
use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig};
66
use relay_metrics::MetricNamespace;
7-
use relay_system::{Addr, NoResponse, Recipient, Service, ServiceRunner};
7+
use relay_system::{Addr, NoResponse, Recipient, Service, ServiceSpawnExt as _};
88

99
use crate::services::metrics::{
1010
Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets,
@@ -19,13 +19,15 @@ use crate::utils;
1919
/// Metrics are routed to the first aggregator which matches the configuration's [`Condition`].
2020
/// If no condition matches, the metric/bucket is routed to the `default_aggregator`.
2121
pub struct RouterService {
22+
handle: relay_system::Handle,
2223
default: AggregatorService,
2324
secondary: Vec<(AggregatorService, Condition)>,
2425
}
2526

2627
impl RouterService {
2728
/// Create a new router service.
2829
pub fn new(
30+
handle: relay_system::Handle,
2931
default_config: AggregatorServiceConfig,
3032
secondary_configs: Vec<ScopedAggregatorConfig>,
3133
receiver: Option<Recipient<FlushBuckets, NoResponse>>,
@@ -40,7 +42,11 @@ impl RouterService {
4042
}
4143

4244
let default = AggregatorService::new(default_config, receiver, project_cache);
43-
Self { default, secondary }
45+
Self {
46+
handle,
47+
default,
48+
secondary,
49+
}
4450
}
4551

4652
pub fn handle(&self) -> RouterHandle {
@@ -57,7 +63,7 @@ impl Service for RouterService {
5763
type Interface = Aggregator;
5864

5965
async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
60-
let mut router = StartedRouter::start_in(self, &mut ServiceRunner::new());
66+
let mut router = StartedRouter::start(self);
6167
relay_log::info!("metrics router started");
6268

6369
// Note that currently this loop never exists and will run till the tokio runtime shuts
@@ -84,8 +90,12 @@ struct StartedRouter {
8490
}
8591

8692
impl StartedRouter {
87-
fn start_in(router: RouterService, runner: &mut ServiceRunner) -> Self {
88-
let RouterService { default, secondary } = router;
93+
fn start(router: RouterService) -> Self {
94+
let RouterService {
95+
default,
96+
secondary,
97+
handle,
98+
} = router;
8999

90100
let secondary = secondary
91101
.into_iter()
@@ -95,12 +105,12 @@ impl StartedRouter {
95105
.filter(|&namespace| condition.matches(Some(namespace)))
96106
.collect();
97107

98-
(runner.start(aggregator), namespaces)
108+
(handle.start(aggregator), namespaces)
99109
})
100110
.collect();
101111

102112
Self {
103-
default: runner.start(default),
113+
default: handle.start(default),
104114
secondary,
105115
}
106116
}

relay-server/src/services/projects/cache/service.rs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use futures::StreamExt as _;
55
use relay_base_schema::project::ProjectKey;
66
use relay_config::Config;
77
use relay_statsd::metric;
8-
use relay_system::{Service, ServiceRunner};
8+
use relay_system::{Service, ServiceSpawn, ServiceSpawnExt as _};
99
use tokio::sync::broadcast;
1010

1111
use crate::services::projects::cache::handle::ProjectCacheHandle;
@@ -91,7 +91,7 @@ impl ProjectCacheService {
9191
/// Consumes and starts a [`ProjectCacheService`].
9292
///
9393
/// Returns a [`ProjectCacheHandle`] to access the cache concurrently.
94-
pub fn start_in(self, runner: &mut ServiceRunner) -> ProjectCacheHandle {
94+
pub fn start_in(self, services: &dyn ServiceSpawn) -> ProjectCacheHandle {
9595
let (addr, addr_rx) = relay_system::channel(Self::name());
9696

9797
let handle = ProjectCacheHandle {
@@ -101,7 +101,7 @@ impl ProjectCacheService {
101101
project_changes: self.project_events_tx.clone(),
102102
};
103103

104-
runner.start_with(self, addr_rx);
104+
services.start_with(self, addr_rx);
105105

106106
handle
107107
}

0 commit comments

Comments
 (0)