Skip to content

Commit 90f67c6

Browse files
authored
feat(spooler): Add configuration check based on memory size (#4617)
1 parent 53bd9ef commit 90f67c6

File tree

4 files changed

+80
-8
lines changed

4 files changed

+80
-8
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
- Expose runtime utilization metric in autoscaler endpoint. ([#4606](https://github.com/getsentry/relay/pull/4606))
2525
- Bump the revision of `sysinfo` to the revision at `15b3be3273ba286740122fed7bb7dccd2a79dc8f`. ([#4613](https://github.com/getsentry/relay/pull/4613))
2626
- Switch the processor and store to `async`. ([#4552](https://github.com/getsentry/relay/pull/4552))
27+
- Validate the spooling memory configuration on startup. ([#4617](https://github.com/getsentry/relay/pull/4617))
2728

2829
## 25.3.0
2930

relay-server/src/utils/memory.rs

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,12 @@ impl MemoryStat {
7777
}))
7878
}
7979

80+
/// Returns the current memory data without instantiating [`MemoryStat`].
81+
pub fn current_memory() -> Memory {
82+
let mut system = System::new();
83+
Self::refresh_memory(&mut system)
84+
}
85+
8086
/// Returns a copy of the most up-to-date memory data.
8187
pub fn memory(&self) -> Memory {
8288
self.try_update();
@@ -103,6 +109,18 @@ impl MemoryStat {
103109
memory
104110
}
105111

112+
/// Updates the memory readings unconditionally.
113+
fn update(&self) {
114+
let mut system = self
115+
.0
116+
.system
117+
.lock()
118+
.unwrap_or_else(|system| system.into_inner());
119+
120+
let updated_memory = Self::refresh_memory(&mut system);
121+
self.0.memory.store(Arc::new(updated_memory));
122+
}
123+
106124
/// Updates the memory readings if at least `refresh_frequency_ms` has passed.
107125
fn try_update(&self) {
108126
let last_update = self.0.last_update.load(Ordering::Relaxed);
@@ -126,14 +144,7 @@ impl MemoryStat {
126144
return;
127145
}
128146

129-
let mut system = self
130-
.0
131-
.system
132-
.lock()
133-
.unwrap_or_else(|system| system.into_inner());
134-
135-
let updated_memory = Self::refresh_memory(&mut system);
136-
self.0.memory.store(Arc::new(updated_memory));
147+
self.update();
137148
}
138149
}
139150

relay/src/setup.rs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,32 @@
22
use anyhow::Context;
33
use anyhow::Result;
44
use relay_config::{Config, RelayMode};
5+
use relay_server::MemoryStat;
6+
7+
/// Validates that the `batch_size_bytes` of the configuration is correct and doesn't lead to
8+
/// deadlocks in the buffer.
9+
fn assert_batch_size_bytes(config: &Config) -> Result<()> {
10+
// We create a temporary memory reading used just for the config check.
11+
let memory = MemoryStat::current_memory();
12+
13+
// We expect the batch size for the spooler to be 10% of the memory threshold over which the
14+
// buffer stops unspooling.
15+
//
16+
// The 10% threshold was arbitrarily chosen to give the system leeway when spooling.
17+
let configured_batch_size_bytes = config.spool_envelopes_batch_size_bytes() as f32;
18+
let maximum_batch_size_bytes =
19+
memory.total as f32 * config.spool_max_backpressure_memory_percent() * 0.1;
20+
21+
if configured_batch_size_bytes > maximum_batch_size_bytes {
22+
anyhow::bail!(
23+
"the configured `spool.envelopes.batch_size_bytes` is {} bytes but it must be <= than {} bytes",
24+
configured_batch_size_bytes,
25+
maximum_batch_size_bytes
26+
)
27+
}
28+
29+
Ok(())
30+
}
531

632
pub fn check_config(config: &Config) -> Result<()> {
733
if config.relay_mode() == RelayMode::Managed && config.credentials().is_none() {
@@ -31,6 +57,8 @@ pub fn check_config(config: &Config) -> Result<()> {
3157
}
3258
}
3359

60+
assert_batch_size_bytes(config)?;
61+
3462
Ok(())
3563
}
3664

tests/integration/test_basic.py

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,38 @@ def get_project_config():
8585
conn.close()
8686

8787

88+
def test_batch_size_bytes_asserted(mini_sentry, relay):
89+
from time import sleep
90+
91+
# Create a temporary directory for the sqlite db.
92+
db_file_path = os.path.join(tempfile.mkdtemp(), "database.db")
93+
94+
get_project_config_original = mini_sentry.app.view_functions["get_project_config"]
95+
96+
@mini_sentry.app.endpoint("get_project_config")
97+
def get_project_config():
98+
sleep(1) # Causes the process to wait for one second before shutting down
99+
return get_project_config_original()
100+
101+
project_id = 42
102+
mini_sentry.add_basic_project_config(project_id)
103+
104+
mini_sentry.fail_on_relay_error = False
105+
106+
relay = relay(
107+
mini_sentry,
108+
{
109+
"limits": {"shutdown_timeout": 2},
110+
# Arbitrarily chosen high value to always fail.
111+
"spool": {"envelopes": {"path": db_file_path, "batch_size_bytes": "10tb"}},
112+
},
113+
wait_health_check=False,
114+
)
115+
116+
# Assert that the process exited with an error (non-zero exit code)
117+
assert relay.wait_for_exit() != 0, "Expected Relay to not start, but it started"
118+
119+
88120
@pytest.mark.skip("Flaky test")
89121
def test_forced_shutdown(mini_sentry, relay):
90122
from time import sleep

0 commit comments

Comments
 (0)