Skip to content
Merged
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
- Implements basic inbound filters for logs. ([#5011](https://github.com/getsentry/relay/pull/5011))
- Always emit a span usage metric, independent of span feature flags. ([#4976](https://github.com/getsentry/relay/pull/4976))
- Improve PII scrubbing for `logentry.formatted` by ensuring only sensitive data is redacted, rather than replacing the entire field value. ([#4985](https://github.com/getsentry/relay/pull/4985))
- Add `downsampled_event_retention` to the project configuration. ([#5013](https://github.com/getsentry/relay/pull/5013))
- Pass `downsampled_event_retention` to `Traceitem` where appropriate. ([#5013](https://github.com/getsentry/relay/pull/5013), [#5041](https://github.com/getsentry/relay/pull/5041))

**Bug Fixes**:

Expand Down
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ sentry-core = "0.41.0"
sentry-kafka-schemas = { version = "1.3.2", default-features = false }
sentry-release-parser = { version = "1.3.2", default-features = false }
sentry-types = "0.41.0"
sentry_protos = "0.3.0"
sentry_protos = "0.3.3"
serde = { version = "1.0.215", features = ["derive", "rc"] }
serde-transcode = "1.1.1"
serde-vars = "0.2.0"
Expand Down
22 changes: 22 additions & 0 deletions relay-server/src/envelope/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,13 @@ pub struct EnvelopeHeaders<M = RequestMeta> {
#[serde(default, skip_serializing_if = "Option::is_none")]
retention: Option<u16>,

/// Data retention in days for the items of this envelope.
///
/// This value is always overwritten in processing mode by the value specified in the project
/// configuration.
#[serde(default, skip_serializing_if = "Option::is_none")]
downsampled_retention: Option<u16>,

/// Timestamp when the event has been sent, according to the SDK.
///
/// This can be used to perform drift correction.
Expand Down Expand Up @@ -155,6 +162,7 @@ impl EnvelopeHeaders<PartialMeta> {
event_id: self.event_id,
meta: meta.copy_to(request_meta),
retention: self.retention,
downsampled_retention: self.downsampled_retention,
sent_at: self.sent_at,
trace: self.trace,
required_features: self.required_features,
Expand Down Expand Up @@ -205,6 +213,7 @@ impl Envelope {
event_id,
meta,
retention: None,
downsampled_retention: None,
sent_at: None,
other: BTreeMap::new(),
trace: None,
Expand Down Expand Up @@ -303,6 +312,14 @@ impl Envelope {
self.headers.retention.unwrap_or(DEFAULT_EVENT_RETENTION)
}

/// Returns the data retention in days for items in this envelope.
#[cfg_attr(not(feature = "processing"), allow(dead_code))]
pub fn downsampled_retention(&self) -> u16 {
self.headers
.downsampled_retention
.unwrap_or(self.retention())
}

/// When the event has been sent, according to the SDK.
pub fn sent_at(&self) -> Option<DateTime<Utc>> {
self.headers.sent_at
Expand Down Expand Up @@ -360,6 +377,11 @@ impl Envelope {
self.headers.retention = Some(retention);
}

/// Sets the data retention in days for items in this envelope.
pub fn set_downsampled_retention(&mut self, retention: u16) {
self.headers.downsampled_retention = Some(retention);
}

/// Runs transaction parametrization on the DSC trace transaction.
///
/// The purpose is for trace rules to match on the parametrized version of the transaction.
Expand Down
9 changes: 7 additions & 2 deletions relay-server/src/processing/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,11 +190,13 @@ impl Forward for LogOutput {
let scoping = logs.scoping();
let received_at = logs.received_at();

let (logs, retention) = logs.split_with_context(|logs| (logs.logs, logs.retention));
let (logs, retentions) = logs
.split_with_context(|logs| (logs.logs, (logs.retention, logs.downsampled_retention)));
let ctx = store::Context {
scoping,
received_at,
retention,
retention: retentions.0,
downsampled_retention: retentions.1,
};

for log in logs {
Expand Down Expand Up @@ -301,6 +303,9 @@ pub struct ExpandedLogs {
/// Retention in days.
#[cfg(feature = "processing")]
retention: Option<u16>,
/// Downsampled retention in days.
#[cfg(feature = "processing")]
downsampled_retention: Option<u16>,
}

impl Counted for ExpandedLogs {
Expand Down
2 changes: 2 additions & 0 deletions relay-server/src/processing/logs/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ pub fn expand(logs: Managed<SerializedLogs>, _ctx: Context<'_>) -> Managed<Expan
headers: logs.headers,
#[cfg(feature = "processing")]
retention: _ctx.project_info.config.event_retention,
#[cfg(feature = "processing")]
downsampled_retention: _ctx.project_info.config.downsampled_event_retention,
logs: all_logs,
}
})
Expand Down
7 changes: 6 additions & 1 deletion relay-server/src/processing/logs/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ pub struct Context {
pub scoping: Scoping,
/// Storage retention in days.
pub retention: Option<u16>,
/// Storage retention for downsampled data in days
pub downsampled_retention: Option<u16>,
}

pub fn convert(log: WithHeader<OurLog>, ctx: &Context) -> Result<StoreTraceItem> {
Expand All @@ -56,13 +58,15 @@ pub fn convert(log: WithHeader<OurLog>, ctx: &Context) -> Result<StoreTraceItem>
body: required!(log.body),
span_id: log.span_id.into_value(),
};
let retention_days = ctx.retention.unwrap_or(DEFAULT_EVENT_RETENTION);

let trace_item = TraceItem {
item_type: TraceItemType::Log.into(),
organization_id: ctx.scoping.organization_id.value(),
project_id: ctx.scoping.project_id.value(),
received: Some(ts(ctx.received_at)),
retention_days: ctx.retention.unwrap_or(DEFAULT_EVENT_RETENTION).into(),
retention_days: retention_days.into(),
downsampled_retention_days: ctx.downsampled_retention.unwrap_or(retention_days).into(),
timestamp: Some(ts(timestamp.0)),
trace_id: required!(log.trace_id).to_string(),
item_id: Uuid::new_v7(timestamp.into()).as_bytes().to_vec(),
Expand Down Expand Up @@ -352,6 +356,7 @@ mod tests {
key_id: Some(3),
},
retention: Some(42),
downsampled_retention: Some(42),
}
}

Expand Down
8 changes: 8 additions & 0 deletions relay-server/src/services/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2265,6 +2265,14 @@ impl EnvelopeProcessorService {
managed_envelope.envelope_mut().set_retention(retention);
}

// Set the event retention. Effectively, this value will only be available in processing
// mode when the full project config is queried from the upstream.
if let Some(retention) = project_info.config.downsampled_event_retention {
managed_envelope
.envelope_mut()
.set_downsampled_retention(retention);
}

// Ensure the project ID is updated to the stored instance for this project cache. This can
// differ in two cases:
// 1. The envelope was sent to the legacy `/store/` endpoint without a project ID.
Expand Down
22 changes: 17 additions & 5 deletions relay-server/src/services/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,7 @@ impl StoreService {
scoping: Scoping,
) -> Result<(), StoreError> {
let retention = envelope.retention();
let downsampled_retention = envelope.downsampled_retention();

let event_id = envelope.event_id();
let event_item = envelope.as_mut().take_item_by(|item| {
Expand Down Expand Up @@ -340,9 +341,14 @@ impl StoreService {
let client = envelope.meta().client();
self.produce_check_in(scoping.project_id, received_at, client, retention, item)?
}
ItemType::Span => {
self.produce_span(scoping, received_at, event_id, retention, item)?
}
ItemType::Span => self.produce_span(
scoping,
received_at,
event_id,
retention,
downsampled_retention,
item,
)?,
ty @ ItemType::Log => {
debug_assert!(
false,
Expand Down Expand Up @@ -983,6 +989,7 @@ impl StoreService {
received_at: DateTime<Utc>,
event_id: Option<EventId>,
retention_days: u16,
downsampled_retention_days: u16,
item: &Item,
) -> Result<(), StoreError> {
relay_log::trace!("Producing span");
Expand Down Expand Up @@ -1021,6 +1028,7 @@ impl StoreService {
span.organization_id = scoping.organization_id.value();
span.project_id = scoping.project_id.value();
span.retention_days = retention_days;
span.downsampled_retention_days = downsampled_retention_days;
span.start_timestamp_ms = (span.start_timestamp_precise * 1e3) as u64;
span.key_id = scoping.key_id;

Expand Down Expand Up @@ -1075,7 +1083,7 @@ impl StoreService {
scoping: Scoping,
received_at: DateTime<Utc>,
event_id: Option<EventId>,
retention_days: u16,
_retention_days: u16,
span: SpanKafkaMessage,
) -> Result<(), StoreError> {
let mut trace_item = TraceItem {
Expand All @@ -1086,7 +1094,8 @@ impl StoreService {
seconds: safe_timestamp(received_at) as i64,
nanos: 0,
}),
retention_days: retention_days.into(),
retention_days: span.retention_days.into(),
downsampled_retention_days: span.downsampled_retention_days.into(),
timestamp: Some(Timestamp {
seconds: span.start_timestamp_precise as i64,
nanos: 0,
Expand Down Expand Up @@ -1647,6 +1656,9 @@ struct SpanKafkaMessage<'a> {
/// Number of days until these data should be deleted.
#[serde(default)]
retention_days: u16,
/// Number of days until the downsampled version of this data should be deleted.
#[serde(default)]
downsampled_retention_days: u16,
#[serde(default, skip_serializing_if = "Option::is_none")]
segment_id: Option<Cow<'a, str>>,
#[serde(
Expand Down
14 changes: 14 additions & 0 deletions tests/integration/test_spans.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def test_span_extraction(
"sentry.transaction.op": "hi",
},
"description": "GET /api/0/organizations/?member=1",
"downsampled_retention_days": 90,
"duration_ms": int(duration.total_seconds() * 1e3),
"event_id": "cbf6960622e14a45abc1f03b2055b186",
"exclusive_time_ms": 500.0,
Expand Down Expand Up @@ -201,6 +202,7 @@ def test_span_extraction(
"sentry.transaction.op": "hi",
},
"description": "hi",
"downsampled_retention_days": 90,
"duration_ms": duration_ms,
"event_id": "cbf6960622e14a45abc1f03b2055b186",
"exclusive_time_ms": 1500.0,
Expand Down Expand Up @@ -775,6 +777,7 @@ def test_span_ingestion(
"sentry.status": "unknown",
},
"description": "my 1st OTel span",
"downsampled_retention_days": 90,
"duration_ms": 500,
"exclusive_time_ms": 500.0,
"is_segment": True,
Expand Down Expand Up @@ -819,6 +822,7 @@ def test_span_ingestion(
"sentry.status": "unknown",
},
"description": "my 1st V2 span",
"downsampled_retention_days": 90,
"duration_ms": 500,
"exclusive_time_ms": 500.0,
"is_segment": True,
Expand Down Expand Up @@ -867,6 +871,7 @@ def test_span_ingestion(
"score.total": 0.12121616,
},
"description": "https://example.com/p/blah.js",
"downsampled_retention_days": 90,
"duration_ms": 1500,
"exclusive_time_ms": 345.0,
"is_segment": True,
Expand Down Expand Up @@ -921,6 +926,7 @@ def test_span_ingestion(
"sentry.status": "ok",
},
"description": "https://example.com/p/blah.js",
"downsampled_retention_days": 90,
"duration_ms": 1500,
"exclusive_time_ms": 161.0,
"is_segment": True,
Expand Down Expand Up @@ -968,6 +974,7 @@ def test_span_ingestion(
"sentry.op": "default",
},
"description": r"test \" with \" escaped \" chars",
"downsampled_retention_days": 90,
"duration_ms": 1500,
"exclusive_time_ms": 345.0,
"is_segment": False,
Expand Down Expand Up @@ -996,6 +1003,7 @@ def test_span_ingestion(
"sentry.status": "unknown",
},
"description": "my 2nd OTel span",
"downsampled_retention_days": 90,
"duration_ms": 500,
"exclusive_time_ms": 500.0,
"is_segment": True,
Expand Down Expand Up @@ -1038,6 +1046,7 @@ def test_span_ingestion(
"sentry.browser.name": "Chrome",
"sentry.op": "default",
},
"downsampled_retention_days": 90,
"duration_ms": 1500,
"exclusive_time_ms": 345.0,
"is_segment": False,
Expand Down Expand Up @@ -1071,6 +1080,7 @@ def test_span_ingestion(
"sentry.status": "unknown",
},
"description": "my 3rd protobuf OTel span",
"downsampled_retention_days": 90,
"duration_ms": 500,
"exclusive_time_ms": 500.0,
"is_segment": False,
Expand Down Expand Up @@ -1610,6 +1620,7 @@ def test_span_ingestion_with_performance_scores(
"ttfb": 500.0,
"score.cls": 0.0,
},
"downsampled_retention_days": 90,
"duration_ms": 1500,
"exclusive_time_ms": 345.0,
"is_segment": False,
Expand Down Expand Up @@ -1690,6 +1701,7 @@ def test_span_ingestion_with_performance_scores(
"score.total": 0.9948129113413748,
"score.weight.inp": 1.0,
},
"downsampled_retention_days": 90,
"duration_ms": 1500,
"exclusive_time_ms": 345.0,
"is_segment": False,
Expand Down Expand Up @@ -2346,6 +2358,7 @@ def test_scrubs_ip_addresses(
"extra_info": "added by user",
},
"description": "GET /api/0/organizations/?member=1",
"downsampled_retention_days": 90,
"duration_ms": int(duration.total_seconds() * 1e3),
"event_id": "cbf6960622e14a45abc1f03b2055b186",
"exclusive_time_ms": 500.0,
Expand Down Expand Up @@ -2423,6 +2436,7 @@ def test_scrubs_ip_addresses(
"sentry.user.username": "my_user",
},
"description": "hi",
"downsampled_retention_days": 90,
"duration_ms": duration_ms,
"event_id": "cbf6960622e14a45abc1f03b2055b186",
"exclusive_time_ms": 1500.0,
Expand Down