Skip to content

Commit 3d044a7

Browse files
committed
ref(server): Always enforce cached rate limits
1 parent cd67f90 commit 3d044a7

File tree

1 file changed

+127
-133
lines changed

1 file changed

+127
-133
lines changed

relay-server/src/services/processor.rs

Lines changed: 127 additions & 133 deletions
Original file line numberDiff line numberDiff line change
@@ -1245,7 +1245,6 @@ impl EnvelopeProcessorService {
12451245
})
12461246
}
12471247

1248-
#[cfg(feature = "processing")]
12491248
async fn enforce_quotas<Group>(
12501249
&self,
12511250
managed_envelope: &mut TypedEnvelope<Group>,
@@ -1273,28 +1272,30 @@ impl EnvelopeProcessorService {
12731272
)
12741273
.await?;
12751274

1276-
// Enforce all quotas consistently with Redis.
1277-
let consistent_result = RateLimiter::Consistent(rate_limiter)
1278-
.enforce(
1279-
managed_envelope,
1280-
cached_result.event,
1281-
extracted_metrics,
1282-
&global_config,
1283-
project_info,
1284-
rate_limits,
1285-
)
1286-
.await?;
1275+
if_processing!(self.inner.config, {
1276+
// Enforce all quotas consistently with Redis.
1277+
let consistent_result = RateLimiter::Consistent(rate_limiter)
1278+
.enforce(
1279+
managed_envelope,
1280+
cached_result.event,
1281+
extracted_metrics,
1282+
&global_config,
1283+
project_info,
1284+
rate_limits,
1285+
)
1286+
.await?;
12871287

1288-
// Update cached rate limits with the freshly computed ones.
1289-
if !consistent_result.rate_limits.is_empty() {
1290-
self.inner
1291-
.project_cache
1292-
.get(managed_envelope.scoping().project_key)
1293-
.rate_limits()
1294-
.merge(consistent_result.rate_limits);
1295-
}
1288+
// Update cached rate limits with the freshly computed ones.
1289+
if !consistent_result.rate_limits.is_empty() {
1290+
self.inner
1291+
.project_cache
1292+
.get(managed_envelope.scoping().project_key)
1293+
.rate_limits()
1294+
.merge(consistent_result.rate_limits);
1295+
}
12961296

1297-
Ok(consistent_result.event)
1297+
Ok(consistent_result.event)
1298+
} else { Ok(cached_result.event) })
12981299
}
12991300

13001301
/// Extract transaction metrics.
@@ -1629,17 +1630,15 @@ impl EnvelopeProcessorService {
16291630
);
16301631
}
16311632

1632-
if_processing!(self.inner.config, {
1633-
event = self
1634-
.enforce_quotas(
1635-
managed_envelope,
1636-
event,
1637-
&mut extracted_metrics,
1638-
project_info.clone(),
1639-
rate_limits,
1640-
)
1641-
.await?;
1642-
});
1633+
event = self
1634+
.enforce_quotas(
1635+
managed_envelope,
1636+
event,
1637+
&mut extracted_metrics,
1638+
project_info.clone(),
1639+
rate_limits,
1640+
)
1641+
.await?;
16431642

16441643
if event.value().is_some() {
16451644
event::scrub(&mut event, project_info.clone())?;
@@ -1822,17 +1821,15 @@ impl EnvelopeProcessorService {
18221821
// - An empty envelope.
18231822
// - An envelope containing only processed profiles.
18241823
// We need to make sure there are enough quotas for these profiles.
1825-
if_processing!(self.inner.config, {
1826-
event = self
1827-
.enforce_quotas(
1828-
managed_envelope,
1829-
Annotated::empty(),
1830-
&mut extracted_metrics,
1831-
project_info.clone(),
1832-
rate_limits,
1833-
)
1834-
.await?;
1835-
});
1824+
event = self
1825+
.enforce_quotas(
1826+
managed_envelope,
1827+
Annotated::empty(),
1828+
&mut extracted_metrics,
1829+
project_info.clone(),
1830+
rate_limits,
1831+
)
1832+
.await?;
18361833

18371834
return Ok(Some(extracted_metrics));
18381835
}
@@ -1882,17 +1879,19 @@ impl EnvelopeProcessorService {
18821879
spans_extracted,
18831880
);
18841881
}
1882+
});
18851883

1886-
event = self
1887-
.enforce_quotas(
1888-
managed_envelope,
1889-
event,
1890-
&mut extracted_metrics,
1891-
project_info.clone(),
1892-
rate_limits,
1893-
)
1894-
.await?;
1884+
event = self
1885+
.enforce_quotas(
1886+
managed_envelope,
1887+
event,
1888+
&mut extracted_metrics,
1889+
project_info.clone(),
1890+
rate_limits,
1891+
)
1892+
.await?;
18951893

1894+
if_processing!(self.inner.config, {
18961895
event = span::maybe_discard_transaction(managed_envelope, event, project_info);
18971896
});
18981897

@@ -1925,24 +1924,25 @@ impl EnvelopeProcessorService {
19251924
_rate_limits: Arc<RateLimits>,
19261925
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
19271926
profile_chunk::filter(managed_envelope, project_info.clone());
1927+
19281928
if_processing!(self.inner.config, {
19291929
profile_chunk::process(
19301930
managed_envelope,
19311931
project_info.clone(),
19321932
&self.inner.global_config.current(),
19331933
&self.inner.config,
19341934
);
1935-
1936-
self.enforce_quotas(
1937-
managed_envelope,
1938-
Annotated::empty(),
1939-
&mut ProcessingExtractedMetrics::new(),
1940-
project_info,
1941-
_rate_limits,
1942-
)
1943-
.await?;
19441935
});
19451936

1937+
self.enforce_quotas(
1938+
managed_envelope,
1939+
Annotated::empty(),
1940+
&mut ProcessingExtractedMetrics::new(),
1941+
project_info,
1942+
_rate_limits,
1943+
)
1944+
.await?;
1945+
19461946
Ok(None)
19471947
}
19481948

@@ -1968,16 +1968,14 @@ impl EnvelopeProcessorService {
19681968
project_info.clone(),
19691969
);
19701970

1971-
if_processing!(self.inner.config, {
1972-
self.enforce_quotas(
1973-
managed_envelope,
1974-
Annotated::empty(),
1975-
&mut extracted_metrics,
1976-
project_info.clone(),
1977-
_rate_limits,
1978-
)
1979-
.await?;
1980-
});
1971+
self.enforce_quotas(
1972+
managed_envelope,
1973+
Annotated::empty(),
1974+
&mut extracted_metrics,
1975+
project_info.clone(),
1976+
_rate_limits,
1977+
)
1978+
.await?;
19811979

19821980
report::process_user_reports(managed_envelope);
19831981
attachment::scrub(managed_envelope, project_info);
@@ -2000,16 +1998,15 @@ impl EnvelopeProcessorService {
20001998
project_info.clone(),
20011999
&self.inner.config,
20022000
);
2003-
if_processing!(self.inner.config, {
2004-
self.enforce_quotas(
2005-
managed_envelope,
2006-
Annotated::empty(),
2007-
&mut extracted_metrics,
2008-
project_info,
2009-
rate_limits,
2010-
)
2011-
.await?;
2012-
});
2001+
2002+
self.enforce_quotas(
2003+
managed_envelope,
2004+
Annotated::empty(),
2005+
&mut extracted_metrics,
2006+
project_info,
2007+
rate_limits,
2008+
)
2009+
.await?;
20132010

20142011
Ok(Some(extracted_metrics))
20152012
}
@@ -2025,16 +2022,14 @@ impl EnvelopeProcessorService {
20252022
#[allow(unused_mut)]
20262023
let mut extracted_metrics = ProcessingExtractedMetrics::new();
20272024

2028-
if_processing!(self.inner.config, {
2029-
self.enforce_quotas(
2030-
managed_envelope,
2031-
Annotated::empty(),
2032-
&mut extracted_metrics,
2033-
project_info.clone(),
2034-
rate_limits,
2035-
)
2036-
.await?;
2037-
});
2025+
self.enforce_quotas(
2026+
managed_envelope,
2027+
Annotated::empty(),
2028+
&mut extracted_metrics,
2029+
project_info.clone(),
2030+
rate_limits,
2031+
)
2032+
.await?;
20382033

20392034
report::process_client_reports(
20402035
managed_envelope,
@@ -2065,44 +2060,40 @@ impl EnvelopeProcessorService {
20652060
self.inner.geoip_lookup.as_ref(),
20662061
)?;
20672062

2068-
if_processing!(self.inner.config, {
2069-
self.enforce_quotas(
2070-
managed_envelope,
2071-
Annotated::empty(),
2072-
&mut extracted_metrics,
2073-
project_info,
2074-
rate_limits,
2075-
)
2076-
.await?;
2077-
});
2063+
self.enforce_quotas(
2064+
managed_envelope,
2065+
Annotated::empty(),
2066+
&mut extracted_metrics,
2067+
project_info,
2068+
rate_limits,
2069+
)
2070+
.await?;
20782071

20792072
Ok(Some(extracted_metrics))
20802073
}
20812074

20822075
/// Processes cron check-ins.
20832076
async fn process_checkins(
20842077
&self,
2085-
#[allow(unused_variables)] managed_envelope: &mut TypedEnvelope<CheckInGroup>,
2086-
#[allow(unused_variables)] project_id: ProjectId,
2087-
#[allow(unused_variables)] project_info: Arc<ProjectInfo>,
2088-
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
2078+
managed_envelope: &mut TypedEnvelope<CheckInGroup>,
2079+
project_id: ProjectId,
2080+
project_info: Arc<ProjectInfo>,
2081+
rate_limits: Arc<RateLimits>,
20892082
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
2090-
#[allow(unused_mut)]
2091-
let mut extracted_metrics = ProcessingExtractedMetrics::new();
2083+
self.enforce_quotas(
2084+
managed_envelope,
2085+
Annotated::empty(),
2086+
&mut ProcessingExtractedMetrics::new(),
2087+
project_info,
2088+
rate_limits,
2089+
)
2090+
.await?;
20922091

20932092
if_processing!(self.inner.config, {
2094-
self.enforce_quotas(
2095-
managed_envelope,
2096-
Annotated::empty(),
2097-
&mut extracted_metrics,
2098-
project_info,
2099-
rate_limits,
2100-
)
2101-
.await?;
21022093
self.normalize_checkins(managed_envelope, project_id);
21032094
});
21042095

2105-
Ok(Some(extracted_metrics))
2096+
Ok(None)
21062097
}
21072098

21082099
/// Process logs
@@ -2122,17 +2113,20 @@ impl EnvelopeProcessorService {
21222113
project_info.clone(),
21232114
&self.inner.global_config.current(),
21242115
);
2116+
2117+
self.enforce_quotas(
2118+
managed_envelope,
2119+
Annotated::empty(),
2120+
&mut extracted_metrics,
2121+
project_info.clone(),
2122+
rate_limits,
2123+
)
2124+
.await?;
2125+
21252126
if_processing!(self.inner.config, {
2126-
self.enforce_quotas(
2127-
managed_envelope,
2128-
Annotated::empty(),
2129-
&mut extracted_metrics,
2130-
project_info.clone(),
2131-
rate_limits,
2132-
)
2133-
.await?;
21342127
ourlog::process(managed_envelope, project_info.clone());
21352128
});
2129+
21362130
Ok(Some(extracted_metrics))
21372131
}
21382132

@@ -2176,17 +2170,17 @@ impl EnvelopeProcessorService {
21762170
&reservoir,
21772171
)
21782172
.await;
2179-
2180-
self.enforce_quotas(
2181-
managed_envelope,
2182-
Annotated::empty(),
2183-
&mut extracted_metrics,
2184-
project_info,
2185-
rate_limits,
2186-
)
2187-
.await?;
21882173
});
21892174

2175+
self.enforce_quotas(
2176+
managed_envelope,
2177+
Annotated::empty(),
2178+
&mut extracted_metrics,
2179+
project_info,
2180+
rate_limits,
2181+
)
2182+
.await?;
2183+
21902184
Ok(Some(extracted_metrics))
21912185
}
21922186

0 commit comments

Comments
 (0)