Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,11 @@
# Changelog

## Unreleased

**Internal**:

- Forward logs to Kafka directly instead of serialized as envelope. ([#4875](https://github.com/getsentry/relay/pull/4875))

## 25.6.2

**Features**:
Expand All @@ -15,9 +21,9 @@
- Update opentelemetry-proto and sentry-protos dependencies. ([#4847](https://github.com/getsentry/relay/pull/4847))
- Take into account more types of tokens when doing AI cost calculation. ([#4840](https://github.com/getsentry/relay/pull/4840))
- Use the `FiniteF64` type for measurements. ([#4828](https://github.com/getsentry/relay/pull/4828))
- Derive a `sentry.description` attribute for V2 spans ([#4832](https://github.com/getsentry/relay/pull/4832))
- Derive a `sentry.description` attribute for V2 spans. ([#4832](https://github.com/getsentry/relay/pull/4832))
- Consider `gen_ai` also as AI span op prefix. ([#4859](https://github.com/getsentry/relay/pull/4859))
- Change pii scrubbing on some AI attributes to optional ([#4860](https://github.com/getsentry/relay/pull/4860))
- Change pii scrubbing on some AI attributes to optional. ([#4860](https://github.com/getsentry/relay/pull/4860))
- Conditionally set `total_cost` and `total_tokens` attributes on AI spans. ([#4868](https://github.com/getsentry/relay/pull/4868))

## 25.6.1
Expand Down
2 changes: 1 addition & 1 deletion relay-event-schema/src/protocol/attributes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ impl IntoValue for AttributeType {

/// Wrapper struct around a collection of attributes with some convenience methods.
#[derive(Debug, Clone, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)]
pub struct Attributes(Object<Attribute>);
pub struct Attributes(pub Object<Attribute>);

impl Attributes {
/// Creates an empty collection of attributes.
Expand Down
10 changes: 10 additions & 0 deletions relay-event-schema/src/protocol/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,16 @@ impl Timestamp {
}
}

impl From<Timestamp> for uuid::Timestamp {
fn from(value: Timestamp) -> Self {
uuid::Timestamp::from_unix(
uuid::NoContext,
u64::try_from(value.0.timestamp()).unwrap_or(0),
value.0.timestamp_subsec_nanos(),
)
}
}

impl ProcessValue for Timestamp {
#[inline]
fn value_type(&self) -> EnumSet<ValueType> {
Expand Down
6 changes: 6 additions & 0 deletions relay-server/src/envelope/container.rs
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,12 @@ impl<T> From<ContainerItems<T>> for ItemContainer<T> {
}
}

impl<T> From<Vec<Annotated<T>>> for ItemContainer<T> {
fn from(items: Vec<Annotated<T>>) -> Self {
ContainerItems::from_vec(items).into()
}
}

impl ContainerItem for relay_event_schema::protocol::OurLog {
const ITEM_TYPE: ItemType = ItemType::Log;
const CONTENT_TYPE: ContentType = ContentType::LogContainer;
Expand Down
68 changes: 53 additions & 15 deletions relay-server/src/processing/logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,11 @@ use std::sync::Arc;
use relay_event_schema::processor::ProcessingAction;
use relay_event_schema::protocol::OurLog;
use relay_pii::PiiConfigError;
use relay_protocol::Annotated;
use relay_quotas::{DataCategory, RateLimits};

use crate::Envelope;
use crate::envelope::{
ContainerItems, ContainerWriteError, EnvelopeHeaders, Item, ItemContainer, ItemType, Items,
};
use crate::envelope::{ContainerWriteError, EnvelopeHeaders, Item, ItemContainer, ItemType, Items};
use crate::processing::{
self, Context, Counted, Forward, Managed, ManagedResult as _, OutcomeError, Output, Quantities,
QuotaRateLimiter, RateLimited, RateLimiter, Rejected,
Expand All @@ -18,6 +17,8 @@ use crate::utils::ManagedEnvelope;

mod filter;
mod process;
#[cfg(feature = "processing")]
mod store;
mod validate;

/// Temporary byte count of a single log.
Expand Down Expand Up @@ -100,10 +101,12 @@ impl processing::Processor for LogsProcessor {

let otel_logs = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::OtelLog));
.take_items_by(|item| matches!(*item.ty(), ItemType::OtelLog))
.into_vec();
let logs = envelope
.envelope_mut()
.take_items_by(|item| matches!(*item.ty(), ItemType::Log));
.take_items_by(|item| matches!(*item.ty(), ItemType::Log))
.into_vec();

let work = SerializedLogs {
headers,
Expand All @@ -126,7 +129,7 @@ impl processing::Processor for LogsProcessor {
self.limiter.enforce_quotas(&mut logs, ctx).await?;

if ctx.is_processing() {
let mut logs = process::expand(logs);
let mut logs = process::expand(logs, ctx);
process::process(&mut logs, ctx);

Ok(Output::just(LogOutput::Processed(logs)))
Expand All @@ -137,10 +140,7 @@ impl processing::Processor for LogsProcessor {
}

/// Output produced by [`LogsProcessor`].
#[expect(
clippy::large_enum_variant,
reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
)]
#[derive(Debug)]
pub enum LogOutput {
NotProcessed(Managed<SerializedLogs>),
Processed(Managed<ExpandedLogs>),
Expand All @@ -159,27 +159,61 @@ impl Forward for LogOutput {

Ok(logs.map(|logs, _| logs.serialize_envelope()))
}

#[cfg(feature = "processing")]
fn forward_store(
self,
s: &relay_system::Addr<crate::services::store::Store>,
) -> Result<(), Rejected<()>> {
let logs = match self {
LogOutput::NotProcessed(logs) => {
return Err(logs.internal_error(
"logs must be processed before they can be forwarded to the store",
));
}
LogOutput::Processed(logs) => logs,
};

let scoping = logs.scoping();
let received_at = logs.received_at();

let (logs, retention) = logs.split_with_context(|logs| (logs.logs, logs.retention));
let ctx = store::Context {
scoping,
received_at,
retention,
};

for log in logs {
if let Ok(log) = log.try_map(|log, _| store::convert(log, &ctx)) {
s.send(log)
};
}

Ok(())
}
}

/// Logs in their serialized state, as transported in an envelope.
#[derive(Debug)]
pub struct SerializedLogs {
/// Original envelope headers.
headers: EnvelopeHeaders,

/// OTel Logs are not sent in containers, an envelope is very likely to contain multiple OTel logs.
otel_logs: Items,
otel_logs: Vec<Item>,
/// Logs are sent in item containers, there is specified limit of a single container per
/// envelope.
///
/// But at this point this has not yet been validated.
logs: Items,
logs: Vec<Item>,
}

impl SerializedLogs {
fn serialize_envelope(self) -> Box<Envelope> {
let mut items = self.logs;
items.extend(self.otel_logs);
Envelope::from_parts(self.headers, items)
Envelope::from_parts(self.headers, Items::from_vec(items))
}

fn items(&self) -> impl Iterator<Item = &Item> {
Expand Down Expand Up @@ -241,11 +275,15 @@ impl RateLimited for Managed<SerializedLogs> {
}

/// Logs which have been parsed and expanded from their serialized state.
#[derive(Debug)]
pub struct ExpandedLogs {
/// Original envelope headers.
headers: EnvelopeHeaders,
/// Retention in days.
#[cfg(feature = "processing")]
retention: Option<u16>,
/// Expanded and parsed logs.
logs: ContainerItems<OurLog>,
logs: Vec<Annotated<OurLog>>,
}

impl Counted for ExpandedLogs {
Expand All @@ -268,7 +306,7 @@ impl ExpandedLogs {
Ok(SerializedLogs {
headers: self.headers,
otel_logs: Default::default(),
logs: smallvec::smallvec![item],
logs: vec![item],
})
}
}
7 changes: 4 additions & 3 deletions relay-server/src/processing/logs/process.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,17 @@ use relay_event_schema::protocol::{AttributeType, BrowserContext, OurLog};
use relay_ourlogs::OtelLog;
use relay_pii::PiiProcessor;
use relay_protocol::{Annotated, ErrorKind, Value};
use smallvec::SmallVec;

use crate::envelope::{ContainerItems, Item, ItemContainer};
use crate::extractors::RequestMeta;
use crate::processing::logs::{Error, ExpandedLogs, Result, SerializedLogs};
use crate::processing::{Context, Managed};
use crate::services::outcome::DiscardReason;

pub fn expand(logs: Managed<SerializedLogs>) -> Managed<ExpandedLogs> {
pub fn expand(logs: Managed<SerializedLogs>, _ctx: Context<'_>) -> Managed<ExpandedLogs> {
let received_at = logs.received_at();
logs.map(|logs, records| {
let mut all_logs = SmallVec::with_capacity(logs.count());
let mut all_logs = Vec::with_capacity(logs.count());

for logs in logs.logs {
let expanded = expand_log_container(&logs, received_at);
Expand All @@ -38,6 +37,8 @@ pub fn expand(logs: Managed<SerializedLogs>) -> Managed<ExpandedLogs> {

ExpandedLogs {
headers: logs.headers,
#[cfg(feature = "processing")]
retention: _ctx.project_info.config.event_retention,
logs: all_logs,
}
})
Expand Down
110 changes: 110 additions & 0 deletions relay-server/src/processing/logs/store.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use prost_types::Timestamp;
use relay_event_schema::protocol::{Attributes, OurLog};
use relay_protocol::{Annotated, Value};
use relay_quotas::Scoping;
use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType, any_value};
use uuid::Uuid;

use crate::constants::DEFAULT_EVENT_RETENTION;
use crate::processing::Counted;
use crate::processing::logs::{Error, Result};
use crate::services::outcome::DiscardReason;
use crate::services::store::StoreLog;

macro_rules! required {
($value:expr) => {{
match $value {
Annotated(Some(value), _) => value,
Annotated(None, meta) => {
relay_log::debug!(
"dropping log because of missing required field {} with meta {meta:?}",
stringify!($value),
);
return Err(Error::Invalid(DiscardReason::InvalidLog));
}
}
}};
}

/// Context parameters for [`convert`].
#[derive(Debug, Clone, Copy)]
pub struct Context {
/// Received time.
pub received_at: DateTime<Utc>,
/// Item scoping.
pub scoping: Scoping,
/// Storage retention in days.
pub retention: Option<u16>,
}

pub fn convert(log: Annotated<OurLog>, ctx: &Context) -> Result<StoreLog> {
let quantities = log.quantities();

let log = required!(log);
let timestamp = required!(log.timestamp);
let attrs = log.attributes.0.unwrap_or_default();

let trace_item = TraceItem {
item_type: TraceItemType::Log.into(),
organization_id: ctx.scoping.organization_id.value(),
project_id: ctx.scoping.project_id.value(),
received: Some(ts(ctx.received_at)),
retention_days: ctx.retention.unwrap_or(DEFAULT_EVENT_RETENTION).into(),
timestamp: Some(ts(timestamp.0)),
trace_id: required!(log.trace_id).to_string(),
item_id: Uuid::new_v7(timestamp.into()).as_bytes().to_vec(),
attributes: attributes(attrs),
client_sample_rate: 1.0,
server_sample_rate: 1.0,
};

Ok(StoreLog {
trace_item,
quantities,
})
}

fn ts(dt: DateTime<Utc>) -> Timestamp {
Timestamp {
seconds: dt.timestamp(),
nanos: i32::try_from(dt.timestamp_subsec_nanos()).unwrap_or(0),
}
}

fn attributes(attributes: Attributes) -> HashMap<String, AnyValue> {
let mut result = HashMap::with_capacity(attributes.0.len());

for (name, attribute) in attributes {
let value = attribute
.into_value()
.and_then(|v| v.value.value.into_value());

let Some(value) = value else {
// Emit `_meta` attributes here with #4804.
continue;
};

let Some(value) = (match value {
Value::Bool(v) => Some(any_value::Value::BoolValue(v)),
Value::I64(v) => Some(any_value::Value::IntValue(v)),
Value::U64(v) => i64::try_from(v).ok().map(any_value::Value::IntValue),
Value::F64(v) => Some(any_value::Value::DoubleValue(v)),
Value::String(v) => Some(any_value::Value::StringValue(v)),
// These cases do not happen, as they are not valid attributes
// and they should have been filtered out before already.
Value::Array(_) | Value::Object(_) => {
debug_assert!(false, "unsupported log value");
None
}
}) else {
continue;
};

result.insert(name, AnyValue { value: Some(value) });
}

result
}
Loading
Loading