Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@
- Add new `relay-threading` crate with asynchronous thread pool. ([#4500](https://github.com/getsentry/relay/pull/4500))
- Support span `category` inference from span attributes. ([#4509](https://github.com/getsentry/relay/pull/4509))

**Internal**:

- Track an utilization metric for internal services. ([#4501](https://github.com/getsentry/relay/pull/4501))

## 25.2.0

- Allow log ingestion behind a flag, only for internal use currently. ([#4471](https://github.com/getsentry/relay/pull/4471))
Expand Down
1 change: 1 addition & 0 deletions relay-server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,7 @@ criterion = { workspace = true }
http = { workspace = true }
insta = { workspace = true }
relay-protocol = { workspace = true, features = ["test"] }
relay-system = { workspace = true, features = ["test"] }
relay-test = { workspace = true }
similar-asserts = { workspace = true }
tempfile = { workspace = true }
Expand Down
13 changes: 8 additions & 5 deletions relay-server/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,7 @@ mod testutils;
use std::sync::Arc;

use relay_config::Config;
use relay_system::Controller;
use relay_system::{Controller, ServiceSpawnExt as _};

use crate::service::ServiceState;
use crate::services::server::HttpServer;
Expand All @@ -294,18 +294,21 @@ pub fn run(config: Config) -> anyhow::Result<()> {

// Creates the main runtime.
let runtime = crate::service::create_runtime("main-rt", config.cpu_concurrency());
let runtime_metrics = runtime.metrics();
let handle = runtime.handle().clone();

// Run the system and block until a shutdown signal is sent to this process. Inside, start a
// web server and run all relevant services. See the `actors` module documentation for more
// information on all services.
runtime.block_on(async {
Controller::start(config.shutdown_timeout());
let (state, mut runner) = ServiceState::start(runtime_metrics, config.clone()).await?;
runner.start(HttpServer::new(config, state.clone())?);

let mut services = handle.service_set();

let state = ServiceState::start(&handle, &services, config.clone()).await?;
services.start(HttpServer::new(config, state.clone())?);

tokio::select! {
_ = runner.join() => {},
_ = services.join() => {},
// NOTE: when every service implements a shutdown listener,
// awaiting on `finished` becomes unnecessary: We can simply join() and guarantee
// that every service finished its main task.
Expand Down
52 changes: 25 additions & 27 deletions relay-server/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ use relay_redis::redis::Script;
use relay_redis::AsyncRedisClient;
#[cfg(feature = "processing")]
use relay_redis::{PooledClient, RedisError, RedisPool, RedisPools, RedisScripts};
use relay_system::{channel, Addr, Service, ServiceRunner};
use relay_system::{channel, Addr, Service, ServiceSpawn, ServiceSpawnExt as _};

/// Indicates the type of failure of the server.
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -150,12 +150,12 @@ pub struct ServiceState {
impl ServiceState {
/// Starts all services and returns addresses to all of them.
pub async fn start(
rt_metrics: relay_system::RuntimeMetrics,
handle: &relay_system::Handle,
services: &dyn ServiceSpawn,
config: Arc<Config>,
) -> Result<(Self, ServiceRunner)> {
let mut runner = ServiceRunner::new();
let upstream_relay = runner.start(UpstreamRelayService::new(config.clone()));
let test_store = runner.start(TestStoreService::new(config.clone()));
) -> Result<Self> {
let upstream_relay = services.start(UpstreamRelayService::new(config.clone()));
let test_store = services.start(TestStoreService::new(config.clone()));

#[cfg(feature = "processing")]
let redis_pools = match config.redis().filter(|_| config.processing_enabled()) {
Expand All @@ -181,43 +181,44 @@ impl ServiceState {
// Create an address for the `EnvelopeProcessor`, which can be injected into the
// other services.
let (processor, processor_rx) = channel(EnvelopeProcessorService::name());
let outcome_producer = runner.start(OutcomeProducerService::create(
let outcome_producer = services.start(OutcomeProducerService::create(
config.clone(),
upstream_relay.clone(),
processor.clone(),
)?);
let outcome_aggregator =
runner.start(OutcomeAggregator::new(&config, outcome_producer.clone()));
services.start(OutcomeAggregator::new(&config, outcome_producer.clone()));

let keda = runner.start(AutoscalingMetricService::new(memory_stat.clone()));
let keda = services.start(AutoscalingMetricService::new(memory_stat.clone()));

let (global_config, global_config_rx) =
GlobalConfigService::new(config.clone(), upstream_relay.clone());
let global_config_handle = global_config.handle();
// The global config service must start before dependant services are
// started. Messages like subscription requests to the global config
// service fail if the service is not running.
let global_config = runner.start(global_config);
let global_config = services.start(global_config);

let project_source = ProjectSource::start_in(
&mut runner,
services,
Arc::clone(&config),
upstream_relay.clone(),
#[cfg(feature = "processing")]
redis_pools.clone(),
)
.await;
let project_cache_handle =
ProjectCacheService::new(Arc::clone(&config), project_source).start_in(&mut runner);
ProjectCacheService::new(Arc::clone(&config), project_source).start_in(services);

let aggregator = RouterService::new(
handle.clone(),
config.default_aggregator_config().clone(),
config.secondary_aggregator_configs().clone(),
Some(processor.clone().recipient()),
project_cache_handle.clone(),
);
let aggregator_handle = aggregator.handle();
let aggregator = runner.start(aggregator);
let aggregator = services.start(aggregator);

let metric_stats = MetricStats::new(
config.clone(),
Expand All @@ -238,14 +239,14 @@ impl ServiceState {
outcome_aggregator.clone(),
metric_outcomes.clone(),
)
.map(|s| runner.start(s))
.map(|s| services.start(s))
})
.transpose()?;

let cogs = CogsService::new(&config);
let cogs = Cogs::new(CogsServiceRecorder::new(&config, runner.start(cogs)));
let cogs = Cogs::new(CogsServiceRecorder::new(&config, services.start(cogs)));

runner.start_with(
services.start_with(
EnvelopeProcessorService::new(
create_processor_pool(&config)?,
config.clone(),
Expand Down Expand Up @@ -276,26 +277,26 @@ impl ServiceState {
processor.clone(),
outcome_aggregator.clone(),
test_store.clone(),
&mut runner,
services,
);

let health_check = runner.start(HealthCheckService::new(
let health_check = services.start(HealthCheckService::new(
config.clone(),
MemoryChecker::new(memory_stat.clone(), config.clone()),
aggregator_handle,
upstream_relay.clone(),
envelope_buffer.clone(),
));

runner.start(RelayStats::new(
services.start(RelayStats::new(
config.clone(),
rt_metrics,
handle.clone(),
upstream_relay.clone(),
#[cfg(feature = "processing")]
redis_pools.clone(),
));

let relay_cache = runner.start(RelayCacheService::new(
let relay_cache = services.start(RelayCacheService::new(
config.clone(),
upstream_relay.clone(),
));
Expand All @@ -320,12 +321,9 @@ impl ServiceState {
registry,
};

Ok((
ServiceState {
inner: Arc::new(state),
},
runner,
))
Ok(ServiceState {
inner: Arc::new(state),
})
}

/// Returns a reference to the Relay configuration.
Expand Down
22 changes: 12 additions & 10 deletions relay-server/src/services/buffer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ use ahash::RandomState;
use chrono::DateTime;
use chrono::Utc;
use relay_config::Config;
use relay_system::Receiver;
use relay_system::ServiceSpawn;
use relay_system::ServiceSpawnExt as _;
use relay_system::{Addr, FromMessage, Interface, NoResponse, Service};
use relay_system::{Controller, Shutdown};
use relay_system::{Receiver, ServiceRunner};
use tokio::sync::watch;
use tokio::time::{timeout, Instant};

Expand Down Expand Up @@ -99,7 +101,7 @@ impl PartitionedEnvelopeBuffer {
envelope_processor: Addr<EnvelopeProcessor>,
outcome_aggregator: Addr<TrackOutcome>,
test_store: Addr<TestStore>,
runner: &mut ServiceRunner,
services: &dyn ServiceSpawn,
) -> Self {
let mut envelope_buffers = Vec::with_capacity(partitions.get() as usize);
for partition_id in 0..partitions.get() {
Expand All @@ -115,7 +117,7 @@ impl PartitionedEnvelopeBuffer {
test_store: test_store.clone(),
},
)
.start_in(runner);
.start_in(services);

envelope_buffers.push(envelope_buffer);
}
Expand Down Expand Up @@ -236,10 +238,11 @@ impl EnvelopeBufferService {
}

/// Returns both the [`Addr`] to this service, and a reference to the capacity flag.
pub fn start_in(self, runner: &mut ServiceRunner) -> ObservableEnvelopeBuffer {
pub fn start_in(self, services: &dyn ServiceSpawn) -> ObservableEnvelopeBuffer {
let has_capacity = self.has_capacity.clone();

let addr = runner.start(self);
let addr = services.start(self);

ObservableEnvelopeBuffer { addr, has_capacity }
}

Expand Down Expand Up @@ -662,6 +665,7 @@ mod tests {
use relay_base_schema::project::ProjectKey;
use relay_dynamic_config::GlobalConfig;
use relay_quotas::DataCategory;
use relay_system::TokioServiceSpawn;
use std::time::Duration;
use tokio::sync::mpsc;
use uuid::Uuid;
Expand Down Expand Up @@ -726,8 +730,7 @@ mod tests {

service.has_capacity.store(false, Ordering::Relaxed);

let ObservableEnvelopeBuffer { has_capacity, .. } =
service.start_in(&mut ServiceRunner::new());
let ObservableEnvelopeBuffer { has_capacity, .. } = service.start_in(&TokioServiceSpawn);
assert!(!has_capacity.load(Ordering::Relaxed));

tokio::time::advance(Duration::from_millis(100)).await;
Expand Down Expand Up @@ -894,7 +897,6 @@ mod tests {

#[tokio::test(start_paused = true)]
async fn test_partitioned_buffer() {
let mut runner = ServiceRunner::new();
let (_global_tx, global_rx) = watch::channel(global_config::Status::Ready(Arc::new(
GlobalConfig::default(),
)));
Expand Down Expand Up @@ -930,8 +932,8 @@ mod tests {
);

// Start both services and create partitioned buffer
let observable1 = buffer1.start_in(&mut runner);
let observable2 = buffer2.start_in(&mut runner);
let observable1 = buffer1.start_in(&TokioServiceSpawn);
let observable2 = buffer2.start_in(&TokioServiceSpawn);

let partitioned = PartitionedEnvelopeBuffer {
buffers: Arc::new(vec![observable1, observable2]),
Expand Down
24 changes: 17 additions & 7 deletions relay-server/src/services/metrics/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
use relay_config::aggregator::Condition;
use relay_config::{AggregatorServiceConfig, ScopedAggregatorConfig};
use relay_metrics::MetricNamespace;
use relay_system::{Addr, NoResponse, Recipient, Service, ServiceRunner};
use relay_system::{Addr, NoResponse, Recipient, Service, ServiceSpawnExt as _};

use crate::services::metrics::{
Aggregator, AggregatorHandle, AggregatorService, FlushBuckets, MergeBuckets,
Expand All @@ -19,13 +19,15 @@ use crate::utils;
/// Metrics are routed to the first aggregator which matches the configuration's [`Condition`].
/// If no condition matches, the metric/bucket is routed to the `default_aggregator`.
pub struct RouterService {
handle: relay_system::Handle,
default: AggregatorService,
secondary: Vec<(AggregatorService, Condition)>,
}

impl RouterService {
/// Create a new router service.
pub fn new(
handle: relay_system::Handle,
default_config: AggregatorServiceConfig,
secondary_configs: Vec<ScopedAggregatorConfig>,
receiver: Option<Recipient<FlushBuckets, NoResponse>>,
Expand All @@ -40,7 +42,11 @@ impl RouterService {
}

let default = AggregatorService::new(default_config, receiver, project_cache);
Self { default, secondary }
Self {
handle,
default,
secondary,
}
}

pub fn handle(&self) -> RouterHandle {
Expand All @@ -57,7 +63,7 @@ impl Service for RouterService {
type Interface = Aggregator;

async fn run(self, mut rx: relay_system::Receiver<Self::Interface>) {
let mut router = StartedRouter::start_in(self, &mut ServiceRunner::new());
let mut router = StartedRouter::start(self);
relay_log::info!("metrics router started");

// Note that currently this loop never exists and will run till the tokio runtime shuts
Expand All @@ -84,8 +90,12 @@ struct StartedRouter {
}

impl StartedRouter {
fn start_in(router: RouterService, runner: &mut ServiceRunner) -> Self {
let RouterService { default, secondary } = router;
fn start(router: RouterService) -> Self {
let RouterService {
default,
secondary,
handle,
} = router;

let secondary = secondary
.into_iter()
Expand All @@ -95,12 +105,12 @@ impl StartedRouter {
.filter(|&namespace| condition.matches(Some(namespace)))
.collect();

(runner.start(aggregator), namespaces)
(handle.start(aggregator), namespaces)
})
.collect();

Self {
default: runner.start(default),
default: handle.start(default),
secondary,
}
}
Expand Down
6 changes: 3 additions & 3 deletions relay-server/src/services/projects/cache/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use futures::StreamExt as _;
use relay_base_schema::project::ProjectKey;
use relay_config::Config;
use relay_statsd::metric;
use relay_system::{Service, ServiceRunner};
use relay_system::{Service, ServiceSpawn, ServiceSpawnExt as _};
use tokio::sync::broadcast;

use crate::services::projects::cache::handle::ProjectCacheHandle;
Expand Down Expand Up @@ -91,7 +91,7 @@ impl ProjectCacheService {
/// Consumes and starts a [`ProjectCacheService`].
///
/// Returns a [`ProjectCacheHandle`] to access the cache concurrently.
pub fn start_in(self, runner: &mut ServiceRunner) -> ProjectCacheHandle {
pub fn start_in(self, services: &dyn ServiceSpawn) -> ProjectCacheHandle {
let (addr, addr_rx) = relay_system::channel(Self::name());

let handle = ProjectCacheHandle {
Expand All @@ -101,7 +101,7 @@ impl ProjectCacheService {
project_changes: self.project_events_tx.clone(),
};

runner.start_with(self, addr_rx);
services.start_with(self, addr_rx);

handle
}
Expand Down
Loading
Loading