Skip to content

Commit cd67f90

Browse files
authored
fix(profiling): Classify profile chunks for rate limits and outcomes (#4595)
This separates profile chunks into two separate categories. The category is determined by the platform contained within the item payload. This does have several downsides: - No outcomes are emitted until the platform can be determined. - Rate limits happen only in processing (this is the only code which currently extracts the platform). The plan is to hoist the platform into an item header, provided by the SDK directly. Once this happened we can reject all profile chunks without that necessary platform item and we can start rate limiting in the fast path. Additionally we'll change Relay to make the executive decision whether a profile chunk is frontend/backend, this means we no longer need to make that decision in multiple places (they cannot diverge). We will also need to consider how often the platforms change, we can extend Relay in a way where unknown platforms are just forwarded to processing but for the sake of rate limiting considered as the default category. Or just make the assignment configurable via global config. The test `test_profile_chunk_outcomes_rate_limited_via_profile_duration_rate_limit` was never correct, the duration quota never worked it always just worked because it also contained the chunk category, but that is equivalent to the test above. Fixes: getsentry/team-ingest#679
1 parent 31935f8 commit cd67f90

File tree

12 files changed

+267
-177
lines changed

12 files changed

+267
-177
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,10 @@
77
- Add experimental playstation endpoint. ([#4555](https://github.com/getsentry/relay/pull/4555))
88
- Add naver.me / Yeti spider on the crawler filter list. ([#4602](https://github.com/getsentry/relay/pull/4602))
99

10+
**Bug Fixes**:
11+
12+
- Separates profiles into backend and ui profiles. ([#4595](https://github.com/getsentry/relay/pull/4595))
13+
1014
**Internal**:
1115

1216
- Add ui chunk profiling data category. ([#4593](https://github.com/getsentry/relay/pull/4593))

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

relay-profiling/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ workspace = true
1414

1515
[dependencies]
1616
android_trace_log = { workspace = true, features = ["serde"] }
17+
bytes = { workspace = true }
1718
chrono = { workspace = true }
1819
data-encoding = { workspace = true }
1920
itertools = { workspace = true }

relay-profiling/src/lib.rs

Lines changed: 69 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,8 @@
4242
use std::error::Error;
4343
use std::net::IpAddr;
4444
use std::time::Duration;
45+
46+
use bytes::Bytes;
4547
use url::Url;
4648

4749
use relay_base_schema::project::ProjectId;
@@ -80,6 +82,12 @@ const MAX_PROFILE_CHUNK_DURATION: Duration = Duration::from_secs(66);
8082
/// Same format as event IDs.
8183
pub type ProfileId = EventId;
8284

85+
#[derive(Debug, Clone, Copy)]
86+
pub enum ProfileType {
87+
Backend,
88+
Ui,
89+
}
90+
8391
#[derive(Debug, Deserialize)]
8492
struct MinimalProfile {
8593
#[serde(alias = "profile_id", alias = "chunk_id")]
@@ -275,41 +283,72 @@ pub fn expand_profile(
275283
}
276284
}
277285

278-
pub fn expand_profile_chunk(
279-
payload: &[u8],
280-
client_ip: Option<IpAddr>,
281-
filter_settings: &ProjectFiltersConfig,
282-
global_config: &GlobalConfig,
283-
) -> Result<Vec<u8>, ProfileError> {
284-
let profile = match minimal_profile_from_json(payload) {
285-
Ok(profile) => profile,
286-
Err(err) => {
287-
relay_log::warn!(
288-
error = &err as &dyn Error,
289-
from = "minimal",
290-
"invalid profile chunk",
291-
);
292-
return Err(ProfileError::InvalidJson(err));
286+
/// Intermediate type for all processing on a profile chunk.
287+
pub struct ProfileChunk {
288+
profile: MinimalProfile,
289+
payload: Bytes,
290+
}
291+
292+
impl ProfileChunk {
293+
/// Parses a new [`Self`] from raw bytes.
294+
pub fn new(payload: Bytes) -> Result<Self, ProfileError> {
295+
match minimal_profile_from_json(&payload) {
296+
Ok(profile) => Ok(Self { profile, payload }),
297+
Err(err) => {
298+
relay_log::debug!(
299+
error = &err as &dyn Error,
300+
from = "minimal",
301+
"invalid profile chunk",
302+
);
303+
Err(ProfileError::InvalidJson(err))
304+
}
293305
}
294-
};
306+
}
295307

296-
if let Err(filter_stat_key) = relay_filter::should_filter(
297-
&profile,
298-
client_ip,
299-
filter_settings,
300-
global_config.filters(),
301-
) {
302-
return Err(ProfileError::Filtered(filter_stat_key));
308+
/// Returns the [`ProfileType`] this chunk belongs to.
309+
///
310+
/// The profile type is currently determined based on the contained profile
311+
/// platform. It determines the data category this profile chunk belongs to.
312+
///
313+
/// This needs to be synchronized with the implementation in Sentry:
314+
/// <https://github.com/getsentry/sentry/blob/ed2e1c8bcd0d633e6f828fcfbeefbbdd98ef3dba/src/sentry/profiles/task.py#L995>
315+
pub fn profile_type(&self) -> ProfileType {
316+
match self.profile.platform.as_str() {
317+
"cocoa" | "android" | "javascript" => ProfileType::Ui,
318+
_ => ProfileType::Backend,
319+
}
303320
}
304321

305-
match (profile.platform.as_str(), profile.version) {
306-
("android", _) => android::chunk::parse(payload),
307-
(_, sample::Version::V2) => {
308-
let mut profile = sample::v2::parse(payload)?;
309-
profile.normalize()?;
310-
serde_json::to_vec(&profile).map_err(|_| ProfileError::CannotSerializePayload)
322+
/// Applies inbound filters to the profile chunk.
323+
///
324+
/// The profile needs to be filtered (rejected) when this returns an error.
325+
pub fn filter(
326+
&self,
327+
client_ip: Option<IpAddr>,
328+
filter_settings: &ProjectFiltersConfig,
329+
global_config: &GlobalConfig,
330+
) -> Result<(), ProfileError> {
331+
relay_filter::should_filter(
332+
&self.profile,
333+
client_ip,
334+
filter_settings,
335+
global_config.filters(),
336+
)
337+
.map_err(ProfileError::Filtered)
338+
}
339+
340+
/// Normalizes and 'expands' the profile chunk into its normalized form Sentry expects.
341+
pub fn expand(&self) -> Result<Vec<u8>, ProfileError> {
342+
match (self.profile.platform.as_str(), self.profile.version) {
343+
("android", _) => android::chunk::parse(&self.payload),
344+
(_, sample::Version::V2) => {
345+
let mut profile = sample::v2::parse(&self.payload)?;
346+
profile.normalize()?;
347+
Ok(serde_json::to_vec(&profile)
348+
.map_err(|_| ProfileError::CannotSerializePayload)?)
349+
}
350+
(_, _) => Err(ProfileError::PlatformNotSupported),
311351
}
312-
(_, _) => Err(ProfileError::PlatformNotSupported),
313352
}
314353
}
315354

relay-profiling/src/sample/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ pub mod v1;
77
pub mod v2;
88

99
/// Possible values for the version field of the Sample Format.
10-
#[derive(Debug, Serialize, Deserialize, Clone, Default, PartialEq, Eq)]
10+
#[derive(Debug, Serialize, Deserialize, Copy, Clone, Default, PartialEq, Eq)]
1111
pub enum Version {
1212
#[default]
1313
Unknown,

relay-server/src/envelope.rs

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
//! ```
3232
3333
use relay_base_schema::project::ProjectKey;
34+
use relay_profiling::ProfileType;
3435
use std::borrow::Borrow;
3536
use std::collections::BTreeMap;
3637
use std::fmt;
@@ -596,6 +597,12 @@ pub struct ItemHeaders {
596597
#[serde(default, skip)]
597598
ingest_span_in_eap: bool,
598599

600+
/// Tracks whether the item is a backend or ui profile chunk.
601+
///
602+
/// NOTE: This is internal-only and not exposed into the Envelope.
603+
#[serde(default, skip)]
604+
profile_type: Option<ProfileType>,
605+
599606
/// Other attributes for forward compatibility.
600607
#[serde(flatten)]
601608
other: BTreeMap<String, Value>,
@@ -673,6 +680,7 @@ impl Item {
673680
sampled: true,
674681
fully_normalized: false,
675682
ingest_span_in_eap: false,
683+
profile_type: None,
676684
},
677685
payload: Bytes::new(),
678686
}
@@ -725,7 +733,11 @@ impl Item {
725733
ItemType::Span | ItemType::OtelSpan => smallvec![(DataCategory::Span, 1)],
726734
// NOTE: semantically wrong, but too expensive to parse.
727735
ItemType::OtelTracesData => smallvec![(DataCategory::Span, 1)],
728-
ItemType::ProfileChunk => smallvec![(DataCategory::ProfileChunk, 1)], // TODO: should be seconds?
736+
ItemType::ProfileChunk => match self.headers.profile_type {
737+
Some(ProfileType::Backend) => smallvec![(DataCategory::ProfileChunk, 1)],
738+
Some(ProfileType::Ui) => smallvec![(DataCategory::ProfileChunkUi, 1)],
739+
None => smallvec![],
740+
},
729741
ItemType::Unknown(_) => smallvec![],
730742
}
731743
}
@@ -890,6 +902,16 @@ impl Item {
890902
self.headers.ingest_span_in_eap = ingest_span_in_eap;
891903
}
892904

905+
/// Returns the associated profile type of a profile chunk.
906+
pub fn profile_type(&self) -> Option<ProfileType> {
907+
self.headers.profile_type
908+
}
909+
910+
/// Set the profile type of the profile chunk.
911+
pub fn set_profile_type(&mut self, profile_type: ProfileType) {
912+
self.headers.profile_type = Some(profile_type);
913+
}
914+
893915
/// Gets the `sampled` flag.
894916
pub fn sampled(&self) -> bool {
895917
self.headers.sampled

relay-server/src/services/processor.rs

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1922,15 +1922,25 @@ impl EnvelopeProcessorService {
19221922
&self,
19231923
managed_envelope: &mut TypedEnvelope<ProfileChunkGroup>,
19241924
project_info: Arc<ProjectInfo>,
1925+
_rate_limits: Arc<RateLimits>,
19251926
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
19261927
profile_chunk::filter(managed_envelope, project_info.clone());
19271928
if_processing!(self.inner.config, {
19281929
profile_chunk::process(
19291930
managed_envelope,
1930-
project_info,
1931+
project_info.clone(),
19311932
&self.inner.global_config.current(),
19321933
&self.inner.config,
19331934
);
1935+
1936+
self.enforce_quotas(
1937+
managed_envelope,
1938+
Annotated::empty(),
1939+
&mut ProcessingExtractedMetrics::new(),
1940+
project_info,
1941+
_rate_limits,
1942+
)
1943+
.await?;
19341944
});
19351945

19361946
Ok(None)
@@ -1943,7 +1953,7 @@ impl EnvelopeProcessorService {
19431953
config: Arc<Config>,
19441954
project_id: ProjectId,
19451955
project_info: Arc<ProjectInfo>,
1946-
#[allow(unused_variables)] rate_limits: Arc<RateLimits>,
1956+
_rate_limits: Arc<RateLimits>,
19471957
) -> Result<Option<ProcessingExtractedMetrics>, ProcessingError> {
19481958
#[allow(unused_mut)]
19491959
let mut extracted_metrics = ProcessingExtractedMetrics::new();
@@ -1964,7 +1974,7 @@ impl EnvelopeProcessorService {
19641974
Annotated::empty(),
19651975
&mut extracted_metrics,
19661976
project_info.clone(),
1967-
rate_limits,
1977+
_rate_limits,
19681978
)
19691979
.await?;
19701980
});
@@ -2297,7 +2307,9 @@ impl EnvelopeProcessorService {
22972307
rate_limits,
22982308
reservoir_counters
22992309
),
2300-
ProcessingGroup::ProfileChunk => run!(process_profile_chunks, project_info),
2310+
ProcessingGroup::ProfileChunk => {
2311+
run!(process_profile_chunks, project_info, rate_limits)
2312+
}
23012313
// Currently is not used.
23022314
ProcessingGroup::Metrics => {
23032315
// In proxy mode we simply forward the metrics.
@@ -3650,7 +3662,7 @@ impl UpstreamRequest for SendMetricsRequest {
36503662

36513663
/// Container for global and project level [`Quota`].
36523664
#[cfg(feature = "processing")]
3653-
#[derive(Copy, Clone)]
3665+
#[derive(Copy, Clone, Debug)]
36543666
struct CombinedQuotas<'a> {
36553667
global_quotas: &'a [Quota],
36563668
project_quotas: &'a [Quota],

relay-server/src/services/processor/profile_chunk.rs

Lines changed: 36 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use {
1414
crate::services::processor::ProfileChunkGroup,
1515
relay_config::Config,
1616
relay_dynamic_config::GlobalConfig,
17+
relay_profiling::ProfileError,
1718
};
1819

1920
/// Removes profile chunks from the envelope if the feature is not enabled.
@@ -40,44 +41,55 @@ pub fn process(
4041
) {
4142
let client_ip = managed_envelope.envelope().meta().client_addr();
4243
let filter_settings = &project_info.config.filter_settings;
44+
4345
let continuous_profiling_enabled =
4446
if project_info.has_feature(Feature::ContinuousProfilingBetaIngest) {
4547
project_info.has_feature(Feature::ContinuousProfilingBeta)
4648
} else {
4749
project_info.has_feature(Feature::ContinuousProfiling)
4850
};
51+
4952
managed_envelope.retain_items(|item| match item.ty() {
5053
ItemType::ProfileChunk => {
5154
if !continuous_profiling_enabled {
5255
return ItemAction::DropSilently;
5356
}
5457

55-
match relay_profiling::expand_profile_chunk(
56-
&item.payload(),
57-
client_ip,
58-
filter_settings,
59-
global_config,
60-
) {
61-
Ok(payload) => {
62-
if payload.len() <= config.max_profile_size() {
63-
item.set_payload(ContentType::Json, payload);
64-
ItemAction::Keep
65-
} else {
66-
ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
67-
relay_profiling::discard_reason(
68-
relay_profiling::ProfileError::ExceedSizeLimit,
69-
),
70-
)))
71-
}
72-
}
73-
Err(relay_profiling::ProfileError::Filtered(filter_stat_key)) => {
74-
ItemAction::Drop(Outcome::Filtered(filter_stat_key))
75-
}
76-
Err(err) => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
77-
relay_profiling::discard_reason(err),
78-
))),
58+
let chunk = match relay_profiling::ProfileChunk::new(item.payload()) {
59+
Ok(chunk) => chunk,
60+
Err(err) => return error_to_action(err),
61+
};
62+
// Important: set the profile type to get outcomes in the correct category.
63+
item.set_profile_type(chunk.profile_type());
64+
65+
if let Err(err) = chunk.filter(client_ip, filter_settings, global_config) {
66+
return error_to_action(err);
67+
}
68+
69+
let payload = match chunk.expand() {
70+
Ok(expanded) => expanded,
71+
Err(err) => return error_to_action(err),
72+
};
73+
74+
if payload.len() > config.max_profile_size() {
75+
return error_to_action(relay_profiling::ProfileError::ExceedSizeLimit);
7976
}
77+
78+
item.set_payload(ContentType::Json, payload);
79+
ItemAction::Keep
8080
}
8181
_ => ItemAction::Keep,
8282
});
8383
}
84+
85+
#[cfg(feature = "processing")]
86+
fn error_to_action(err: ProfileError) -> ItemAction {
87+
match err {
88+
ProfileError::Filtered(filter_stat_key) => {
89+
ItemAction::Drop(Outcome::Filtered(filter_stat_key))
90+
}
91+
err => ItemAction::Drop(Outcome::Invalid(DiscardReason::Profiling(
92+
relay_profiling::discard_reason(err),
93+
))),
94+
}
95+
}

relay-server/src/utils/managed_envelope.rs

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -470,6 +470,14 @@ impl ManagedEnvelope {
470470
);
471471
}
472472

473+
if self.context.summary.profile_chunk_ui_quantity > 0 {
474+
self.track_outcome(
475+
outcome.clone(),
476+
DataCategory::ProfileChunkUi,
477+
self.context.summary.profile_chunk_ui_quantity,
478+
);
479+
}
480+
473481
self.finish(RelayCounters::EnvelopeRejected, handling);
474482
}
475483

0 commit comments

Comments
 (0)