-
Notifications
You must be signed in to change notification settings - Fork 103
fix(spans): Extract metrics from transaction spans #3273
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 17 commits
7b71f7b
69d6699
730e9fc
b70e922
6134cf6
34c5957
9fce1cd
31b4fe9
4e3df5a
ade6268
1a3bd59
48551c5
57c7090
727e62b
3bbc658
82ada3f
0dc102f
041f13c
ec07d49
e060674
ddf442d
f8d060f
8f16457
38d5821
2adf4c2
38dc916
eb49095
ca1053b
12bd370
f6cdbe2
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -577,6 +577,31 @@ struct ProcessEnvelopeState<'a, Group> { | |
reservoir: ReservoirEvaluator<'a>, | ||
} | ||
|
||
#[cfg(test)] | ||
#[cfg(feature = "processing")] | ||
impl<'a, Group: TryFrom<ProcessingGroup>> ProcessEnvelopeState<'a, Group> { | ||
fn simple(event_json: &str, group: ProcessingGroup, project_state: ProjectState) -> Self { | ||
|
||
use crate::testutils::empty_envelope; | ||
|
||
Self { | ||
event: Annotated::from_json(event_json).unwrap(), | ||
event_metrics_extracted: Default::default(), | ||
metrics: Default::default(), | ||
sample_rates: Default::default(), | ||
extracted_metrics: Default::default(), | ||
project_state: Arc::new(project_state), | ||
sampling_project_state: Default::default(), | ||
project_id: ProjectId::new(42), | ||
managed_envelope: { | ||
let managed_envelope = ManagedEnvelope::silent(empty_envelope(), group); | ||
managed_envelope.try_into().unwrap() | ||
}, | ||
profile_id: Default::default(), | ||
reservoir: ReservoirEvaluator::new(ReservoirCounters::default()), | ||
} | ||
} | ||
} | ||
|
||
impl<'a, Group> ProcessEnvelopeState<'a, Group> { | ||
/// Returns a reference to the contained [`Envelope`]. | ||
fn envelope(&self) -> &Envelope { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,5 @@ | ||
//! Contains the processing-only functionality. | ||
|
||
use std::error::Error; | ||
use std::sync::Arc; | ||
|
||
use chrono::{DateTime, Utc}; | ||
|
@@ -26,7 +25,11 @@ use crate::services::outcome::{DiscardReason, Outcome}; | |
use crate::services::processor::{ | ||
ProcessEnvelopeState, ProcessingError, SpanGroup, TransactionGroup, | ||
}; | ||
use crate::utils::ItemAction; | ||
use crate::statsd::RelayTimers; | ||
use crate::utils::{ItemAction, ManagedEnvelope}; | ||
|
||
#[cfg(feature = "processing")] | ||
use relay_protocol::Meta; | ||
|
||
pub fn process( | ||
state: &mut ProcessEnvelopeState<SpanGroup>, | ||
|
@@ -117,7 +120,7 @@ pub fn process( | |
|
||
// Validate for kafka (TODO: this should be moved to kafka producer) | ||
let annotated_span = match validate(annotated_span) { | ||
Ok(res) => res, | ||
Ok((span, meta)) => Annotated(Some(span), meta), | ||
Err(err) => { | ||
relay_log::error!("invalid span: {err}"); | ||
return ItemAction::Drop(Outcome::Invalid(DiscardReason::InvalidSpan)); | ||
|
@@ -142,6 +145,7 @@ pub fn process( | |
}); | ||
} | ||
|
||
/// Copies spans from the state's transaction event to individual span envelope items. | ||
pub fn extract_from_event(state: &mut ProcessEnvelopeState<TransactionGroup>) { | ||
// Only extract spans from transactions (not errors). | ||
if state.event_type() != Some(EventType::Transaction) { | ||
|
@@ -155,44 +159,12 @@ pub fn extract_from_event(state: &mut ProcessEnvelopeState<TransactionGroup>) { | |
return; | ||
} | ||
|
||
let mut add_span = |span: Annotated<Span>| { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I used the opportunity to upgrade this helper to a real function. |
||
let span = match validate(span) { | ||
Ok(span) => span, | ||
Err(e) => { | ||
relay_log::error!("Invalid span: {e}"); | ||
state.managed_envelope.track_outcome( | ||
Outcome::Invalid(DiscardReason::InvalidSpan), | ||
relay_quotas::DataCategory::SpanIndexed, | ||
1, | ||
); | ||
return; | ||
} | ||
}; | ||
let span = match span.to_json() { | ||
Ok(span) => span, | ||
Err(e) => { | ||
relay_log::error!(error = &e as &dyn Error, "Failed to serialize span"); | ||
state.managed_envelope.track_outcome( | ||
Outcome::Invalid(DiscardReason::InvalidSpan), | ||
relay_quotas::DataCategory::SpanIndexed, | ||
1, | ||
); | ||
return; | ||
} | ||
}; | ||
let mut item = Item::new(ItemType::Span); | ||
item.set_payload(ContentType::Json, span); | ||
// If metrics extraction happened for the event, it also happened for its spans: | ||
item.set_metrics_extracted(state.event_metrics_extracted); | ||
state.managed_envelope.envelope_mut().add_item(item); | ||
}; | ||
|
||
let Some(event) = state.event.value() else { | ||
return; | ||
}; | ||
|
||
// Extract transaction as a span. | ||
let mut transaction_span: Span = event.into(); | ||
let transaction_span: Span = event.into(); | ||
|
||
// Add child spans as envelope items. | ||
if let Some(child_spans) = event.spans.value() { | ||
|
@@ -212,10 +184,29 @@ pub fn extract_from_event(state: &mut ProcessEnvelopeState<TransactionGroup>) { | |
// child spans. | ||
new_span.profile_id = transaction_span.profile_id.clone(); | ||
|
||
add_span(Annotated::new(new_span)); | ||
add_span( | ||
Annotated::new(new_span), | ||
&mut state.managed_envelope, | ||
// If metrics extraction happened for the event, it also happened for its spans: | ||
state.event_metrics_extracted, | ||
); | ||
} | ||
} | ||
|
||
add_transaction_span(transaction_span, state); | ||
} | ||
|
||
/// Adds the span that represents the transaction itself (as opposed to its child spans). | ||
/// | ||
/// Also performs metrics extraction for the transaction i.e. segment span. | ||
fn add_transaction_span( | ||
mut transaction_span: Span, | ||
state: &mut ProcessEnvelopeState<TransactionGroup>, | ||
) { | ||
let Some(event) = state.event.value() else { | ||
return; | ||
}; | ||
|
||
// Extract tags to add to this span as well | ||
let mut shared_tags = tag_extraction::extract_shared_tags(event); | ||
|
||
|
@@ -230,7 +221,59 @@ pub fn extract_from_event(state: &mut ProcessEnvelopeState<TransactionGroup>) { | |
.map(|(k, v)| (k.sentry_tag_key().to_owned(), Annotated::new(v))) | ||
.collect(), | ||
); | ||
add_span(transaction_span.into()); | ||
|
||
let metrics_extraction_config = match state.project_state.config.metric_extraction { | ||
ErrorBoundary::Ok(ref config) if config.is_enabled() => Some(config), | ||
_ => None, | ||
}; | ||
|
||
let mut metrics_extracted = false; | ||
|
||
if let Some(config) = metrics_extraction_config { | ||
relay_statsd::metric!(timer(RelayTimers::EventProcessingSpanMetricsExtraction), { | ||
let metrics = extract_metrics(&transaction_span, config); | ||
state.extracted_metrics.project_metrics.extend(metrics); | ||
}); | ||
metrics_extracted = true; | ||
}; | ||
|
||
add_span( | ||
transaction_span.into(), | ||
&mut state.managed_envelope, | ||
metrics_extracted, | ||
); | ||
} | ||
|
||
fn add_span( | ||
span: Annotated<Span>, | ||
managed_envelope: &mut ManagedEnvelope, | ||
metrics_extracted: bool, | ||
) { | ||
match into_item(span, metrics_extracted) { | ||
Ok(item) => { | ||
managed_envelope.envelope_mut().add_item(item); | ||
} | ||
Err(e) => { | ||
relay_log::error!("Invalid span: {e}"); | ||
managed_envelope.track_outcome( | ||
Outcome::Invalid(DiscardReason::InvalidSpan), | ||
relay_quotas::DataCategory::SpanIndexed, | ||
1, | ||
); | ||
} | ||
} | ||
} | ||
|
||
/// Converts the span to an envelope item and extracts metrics for it. | ||
jjbayer marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||
fn into_item(span: Annotated<Span>, metrics_extracted: bool) -> Result<Item, anyhow::Error> { | ||
let (span, meta) = validate(span)?; | ||
|
||
let span = Annotated(Some(span), meta).to_json()?; | ||
let mut item = Item::new(ItemType::Span); | ||
item.set_payload(ContentType::Json, span); | ||
item.set_metrics_extracted(metrics_extracted); | ||
|
||
Ok(item) | ||
} | ||
|
||
/// Removes the transaction in case the project has made the transition to spans-only. | ||
|
@@ -246,24 +289,24 @@ pub fn maybe_discard_transaction(state: &mut ProcessEnvelopeState<TransactionGro | |
#[derive(Clone, Debug)] | ||
struct NormalizeSpanConfig<'a> { | ||
/// The time at which the event was received in this Relay. | ||
pub received_at: DateTime<Utc>, | ||
received_at: DateTime<Utc>, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. drive-by fix. |
||
/// Allowed time range for spans. | ||
pub timestamp_range: std::ops::Range<UnixTimestamp>, | ||
timestamp_range: std::ops::Range<UnixTimestamp>, | ||
/// The maximum allowed size of tag values in bytes. Longer values will be cropped. | ||
pub max_tag_value_size: usize, | ||
max_tag_value_size: usize, | ||
/// Configuration for generating performance score measurements for web vitals | ||
pub performance_score: Option<&'a PerformanceScoreConfig>, | ||
performance_score: Option<&'a PerformanceScoreConfig>, | ||
/// Configuration for measurement normalization in transaction events. | ||
/// | ||
/// Has an optional [`relay_event_normalization::MeasurementsConfig`] from both the project and the global level. | ||
/// If at least one is provided, then normalization will truncate custom measurements | ||
/// and add units of known built-in measurements. | ||
pub measurements: Option<DynamicMeasurementsConfig<'a>>, | ||
measurements: Option<DynamicMeasurementsConfig<'a>>, | ||
/// The maximum length for names of custom measurements. | ||
/// | ||
/// Measurements with longer names are removed from the transaction event and replaced with a | ||
/// metadata entry. | ||
pub max_name_and_unit_len: Option<usize>, | ||
max_name_and_unit_len: Option<usize>, | ||
} | ||
|
||
fn get_normalize_span_config<'a>( | ||
|
@@ -432,11 +475,11 @@ fn scrub( | |
|
||
/// We do not extract or ingest spans with missing fields if those fields are required on the Kafka topic. | ||
#[cfg(feature = "processing")] | ||
fn validate(mut span: Annotated<Span>) -> Result<Annotated<Span>, anyhow::Error> { | ||
let inner = span | ||
.value_mut() | ||
.as_mut() | ||
.ok_or(anyhow::anyhow!("empty span"))?; | ||
fn validate(span: Annotated<Span>) -> Result<(Span, Meta), anyhow::Error> { | ||
let Annotated(Some(mut inner), meta) = span else { | ||
return Err(anyhow::anyhow!("empty span")); | ||
}; | ||
|
||
let Span { | ||
ref exclusive_time, | ||
ref mut tags, | ||
|
@@ -496,5 +539,47 @@ fn validate(mut span: Annotated<Span>) -> Result<Annotated<Span>, anyhow::Error> | |
tags.retain(|_, value| !value.value().is_empty()) | ||
} | ||
|
||
Ok(span) | ||
Ok((inner, meta)) | ||
} | ||
|
||
#[cfg(test)] | ||
mod tests { | ||
|
||
use std::collections::BTreeSet; | ||
|
||
use super::*; | ||
use crate::services::processor::ProcessingGroup; | ||
use crate::services::project::ProjectState; | ||
|
||
#[test] | ||
fn transaction_span_metrics_extracted() { | ||
let event = r#" | ||
{ | ||
"type": "transaction", | ||
"timestamp": "2021-04-26T08:00:05+0100", | ||
"start_timestamp": "2021-04-26T08:00:00+0100" | ||
} | ||
"#; | ||
|
||
let mut project_state = ProjectState::allowed(); | ||
project_state | ||
.config | ||
.features | ||
.0 | ||
.insert(Feature::ExtractSpansAndSpanMetricsFromEvent); | ||
|
||
let mut state = ProcessEnvelopeState::simple( | ||
event, | ||
ProcessingGroup::Transaction, | ||
project_state.sanitize(), | ||
); | ||
|
||
extract_from_event(&mut state); | ||
|
||
let metrics = state.extracted_metrics.project_metrics; | ||
assert_eq!( | ||
BTreeSet::from_iter(metrics.iter().map(|m| m.name.as_str())), | ||
BTreeSet::from(["c:spans/usage@none", "c:spans/count_per_op@none"]) | ||
); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -208,6 +208,12 @@ impl ManagedEnvelope { | |||||
envelope | ||||||
} | ||||||
|
||||||
/// Returns an untracked envelope that does not report to outcomes or test store. | ||||||
#[cfg(test)] | ||||||
pub fn silent(envelope: Box<Envelope>, group: ProcessingGroup) -> Self { | ||||||
jjbayer marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
|
||||||
Self::standalone(envelope, Addr::custom().0, Addr::custom().0, group) | ||||||
|
Self::standalone(envelope, Addr::custom().0, Addr::custom().0, group) | |
Self::standalone(envelope, Addr::dummy(), Addr::dummy(), group) |
Uh oh!
There was an error while loading. Please reload this page.