Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
- Expose runtime utilization metric in autoscaler endpoint. ([#4606](https://github.com/getsentry/relay/pull/4606))
- Bump the revision of `sysinfo` to the revision at `15b3be3273ba286740122fed7bb7dccd2a79dc8f`. ([#4613](https://github.com/getsentry/relay/pull/4613))
- Switch the processor and store to `async`. ([#4552](https://github.com/getsentry/relay/pull/4552))
- Validate the spooling memory configuration on startup. ([#4617](https://github.com/getsentry/relay/pull/4617))

## 25.3.0

Expand Down
27 changes: 19 additions & 8 deletions relay-server/src/utils/memory.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ impl MemoryStat {
}))
}

/// Returns the current memory data without instantiating [`MemoryStat`].
pub fn current_memory() -> Memory {
let mut system = System::new();
Self::refresh_memory(&mut system)
}

/// Returns a copy of the most up-to-date memory data.
pub fn memory(&self) -> Memory {
self.try_update();
Expand All @@ -103,6 +109,18 @@ impl MemoryStat {
memory
}

/// Updates the memory readings unconditionally.
fn update(&self) {
let mut system = self
.0
.system
.lock()
.unwrap_or_else(|system| system.into_inner());

let updated_memory = Self::refresh_memory(&mut system);
self.0.memory.store(Arc::new(updated_memory));
}

/// Updates the memory readings if at least `refresh_frequency_ms` has passed.
fn try_update(&self) {
let last_update = self.0.last_update.load(Ordering::Relaxed);
Expand All @@ -126,14 +144,7 @@ impl MemoryStat {
return;
}

let mut system = self
.0
.system
.lock()
.unwrap_or_else(|system| system.into_inner());

let updated_memory = Self::refresh_memory(&mut system);
self.0.memory.store(Arc::new(updated_memory));
self.update();
}
}

Expand Down
28 changes: 28 additions & 0 deletions relay/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,32 @@
use anyhow::Context;
use anyhow::Result;
use relay_config::{Config, RelayMode};
use relay_server::MemoryStat;

/// Validates that the `batch_size_bytes` of the configuration is correct and doesn't lead to
/// deadlocks in the buffer.
fn assert_batch_size_bytes(config: &Config) -> Result<()> {
// We create a temporary memory reading used just for the config check.
let memory = MemoryStat::current_memory();

// We expect the batch size for the spooler to be 10% of the memory threshold over which the
// buffer stops unspooling.
//
// The 10% threshold was arbitrarily chosen to give the system leeway when spooling.
let configured_batch_size_bytes = config.spool_envelopes_batch_size_bytes() as f32;
let maximum_batch_size_bytes =
memory.total as f32 * config.spool_max_backpressure_memory_percent() * 0.1;

if configured_batch_size_bytes > maximum_batch_size_bytes {
anyhow::bail!(
"the configured `spool.envelopes.batch_size_bytes` is {} bytes but it must be <= than {} bytes",
configured_batch_size_bytes,
maximum_batch_size_bytes
)
}

Ok(())
}

pub fn check_config(config: &Config) -> Result<()> {
if config.relay_mode() == RelayMode::Managed && config.credentials().is_none() {
Expand Down Expand Up @@ -31,6 +57,8 @@ pub fn check_config(config: &Config) -> Result<()> {
}
}

assert_batch_size_bytes(config)?;

Ok(())
}

Expand Down
32 changes: 32 additions & 0 deletions tests/integration/test_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,38 @@ def get_project_config():
conn.close()


def test_batch_size_bytes_asserted(mini_sentry, relay):
from time import sleep

# Create a temporary directory for the sqlite db.
db_file_path = os.path.join(tempfile.mkdtemp(), "database.db")

get_project_config_original = mini_sentry.app.view_functions["get_project_config"]

@mini_sentry.app.endpoint("get_project_config")
def get_project_config():
sleep(1) # Causes the process to wait for one second before shutting down
return get_project_config_original()

project_id = 42
mini_sentry.add_basic_project_config(project_id)

mini_sentry.fail_on_relay_error = False

relay = relay(
mini_sentry,
{
"limits": {"shutdown_timeout": 2},
# Arbitrarily chosen high value to always fail.
"spool": {"envelopes": {"path": db_file_path, "batch_size_bytes": "10tb"}},
},
wait_health_check=False,
)

# Assert that the process exited with an error (non-zero exit code)
assert relay.wait_for_exit() != 0, "Expected Relay to not start, but it started"


@pytest.mark.skip("Flaky test")
def test_forced_shutdown(mini_sentry, relay):
from time import sleep
Expand Down