Skip to content

Commit 96bf3f2

Browse files
committed
ref(server): Forward logs without extra serialization to Store
1 parent d5b8889 commit 96bf3f2

File tree

10 files changed

+593
-251
lines changed

10 files changed

+593
-251
lines changed

CHANGELOG.md

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,10 @@
1414
- Update opentelemetry-proto and sentry-protos dependencies. ([#4847](https://github.com/getsentry/relay/pull/4847))
1515
- Take into account more types of tokens when doing AI cost calculation. ([#4840](https://github.com/getsentry/relay/pull/4840))
1616
- Use the `FiniteF64` type for measurements. ([#4828](https://github.com/getsentry/relay/pull/4828))
17-
- Derive a `sentry.description` attribute for V2 spans ([#4832](https://github.com/getsentry/relay/pull/4832))
17+
- Derive a `sentry.description` attribute for V2 spans. ([#4832](https://github.com/getsentry/relay/pull/4832))
18+
- Forward logs to Kafka directly instead of serialized as envelope. ([#4875](https://github.com/getsentry/relay/pull/4875))
1819
- Consider `gen_ai` also as AI span op prefix. ([#4859](https://github.com/getsentry/relay/pull/4859))
19-
- Change pii scrubbing on some AI attributes to optional ([#4860](https://github.com/getsentry/relay/pull/4860))
20+
- Change pii scrubbing on some AI attributes to optional. ([#4860](https://github.com/getsentry/relay/pull/4860))
2021
- Conditionally set `total_cost` and `total_tokens` attributes on AI spans. ([#4868](https://github.com/getsentry/relay/pull/4868))
2122

2223
## 25.6.1

relay-event-schema/src/protocol/attributes.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,7 @@ impl IntoValue for AttributeType {
162162

163163
/// Wrapper struct around a collection of attributes with some convenience methods.
164164
#[derive(Debug, Clone, Default, PartialEq, Empty, FromValue, IntoValue, ProcessValue)]
165-
pub struct Attributes(Object<Attribute>);
165+
pub struct Attributes(pub Object<Attribute>);
166166

167167
impl Attributes {
168168
/// Creates an empty collection of attributes.

relay-event-schema/src/protocol/types.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -817,6 +817,16 @@ impl Timestamp {
817817
}
818818
}
819819

820+
impl From<Timestamp> for uuid::Timestamp {
821+
fn from(value: Timestamp) -> Self {
822+
uuid::Timestamp::from_unix(
823+
uuid::NoContext,
824+
u64::try_from(value.0.timestamp()).unwrap_or(0),
825+
value.0.timestamp_subsec_nanos(),
826+
)
827+
}
828+
}
829+
820830
impl ProcessValue for Timestamp {
821831
#[inline]
822832
fn value_type(&self) -> EnumSet<ValueType> {

relay-server/src/processing/logs/mod.rs

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@ use crate::utils::ManagedEnvelope;
1818

1919
mod filter;
2020
mod process;
21+
mod store;
2122
mod validate;
2223

2324
/// Temporary byte count of a single log.
@@ -126,7 +127,7 @@ impl processing::Processor for LogsProcessor {
126127
self.limiter.enforce_quotas(&mut logs, ctx).await?;
127128

128129
if ctx.is_processing() {
129-
let mut logs = process::expand(logs);
130+
let mut logs = process::expand(logs, ctx);
130131
process::process(&mut logs, ctx);
131132

132133
Ok(Output::just(LogOutput::Processed(logs)))
@@ -141,6 +142,7 @@ impl processing::Processor for LogsProcessor {
141142
clippy::large_enum_variant,
142143
reason = "until we have a better solution to combat the excessive growth of Item, see #4819"
143144
)]
145+
#[derive(Debug)]
144146
pub enum LogOutput {
145147
NotProcessed(Managed<SerializedLogs>),
146148
Processed(Managed<ExpandedLogs>),
@@ -159,9 +161,43 @@ impl Forward for LogOutput {
159161

160162
Ok(logs.map(|logs, _| logs.serialize_envelope()))
161163
}
164+
165+
#[cfg(feature = "processing")]
166+
fn forward_store(
167+
self,
168+
s: &relay_system::Addr<crate::services::store::Store>,
169+
) -> Result<(), Rejected<()>> {
170+
let logs = match self {
171+
LogOutput::NotProcessed(logs) => {
172+
return Err(logs.internal_error(
173+
"logs must be processed before they can be forwarded to the store",
174+
));
175+
}
176+
LogOutput::Processed(logs) => logs,
177+
};
178+
179+
let scoping = logs.scoping();
180+
let received_at = logs.received_at();
181+
182+
let (logs, retention) = logs.split_with_context(|logs| (logs.logs, logs.retention));
183+
let ctx = store::Context {
184+
scoping,
185+
received_at,
186+
retention,
187+
};
188+
189+
for log in logs {
190+
if let Ok(log) = log.try_map(|log, _| store::convert(log, &ctx)) {
191+
s.send(log)
192+
};
193+
}
194+
195+
Ok(())
196+
}
162197
}
163198

164199
/// Logs in their serialized state, as transported in an envelope.
200+
#[derive(Debug)]
165201
pub struct SerializedLogs {
166202
/// Original envelope headers.
167203
headers: EnvelopeHeaders,
@@ -241,9 +277,12 @@ impl RateLimited for Managed<SerializedLogs> {
241277
}
242278

243279
/// Logs which have been parsed and expanded from their serialized state.
280+
#[derive(Debug)]
244281
pub struct ExpandedLogs {
245282
/// Original envelope headers.
246283
headers: EnvelopeHeaders,
284+
/// Retention in days.
285+
retention: Option<u16>,
247286
/// Expanded and parsed logs.
248287
logs: ContainerItems<OurLog>,
249288
}

relay-server/src/processing/logs/process.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::processing::logs::{Error, ExpandedLogs, Result, SerializedLogs};
1515
use crate::processing::{Context, Managed};
1616
use crate::services::outcome::DiscardReason;
1717

18-
pub fn expand(logs: Managed<SerializedLogs>) -> Managed<ExpandedLogs> {
18+
pub fn expand(logs: Managed<SerializedLogs>, ctx: Context<'_>) -> Managed<ExpandedLogs> {
1919
let received_at = logs.received_at();
2020
logs.map(|logs, records| {
2121
let mut all_logs = SmallVec::with_capacity(logs.count());
@@ -38,6 +38,7 @@ pub fn expand(logs: Managed<SerializedLogs>) -> Managed<ExpandedLogs> {
3838

3939
ExpandedLogs {
4040
headers: logs.headers,
41+
retention: ctx.project_info.config.event_retention,
4142
logs: all_logs,
4243
}
4344
})
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
use std::collections::HashMap;
2+
3+
use chrono::{DateTime, Utc};
4+
use prost_types::Timestamp;
5+
use relay_event_schema::protocol::{Attributes, OurLog};
6+
use relay_protocol::{Annotated, Value};
7+
use relay_quotas::Scoping;
8+
use sentry_protos::snuba::v1::{AnyValue, TraceItem, TraceItemType, any_value};
9+
use uuid::Uuid;
10+
11+
use crate::constants::DEFAULT_EVENT_RETENTION;
12+
use crate::processing::Counted;
13+
use crate::processing::logs::{Error, Result};
14+
use crate::services::outcome::DiscardReason;
15+
use crate::services::store::StoreLog;
16+
17+
macro_rules! required {
18+
($value:expr) => {{
19+
match $value {
20+
Annotated(Some(value), _) => value,
21+
Annotated(None, meta) => {
22+
relay_log::debug!(
23+
"dropping log because of missing required field {} with meta {meta:?}",
24+
stringify!($value),
25+
);
26+
return Err(Error::Invalid(DiscardReason::InvalidLog));
27+
}
28+
}
29+
}};
30+
}
31+
32+
/// Context parameters for [`convert`].
33+
#[derive(Debug, Clone, Copy)]
34+
pub struct Context {
35+
/// Received time.
36+
pub received_at: DateTime<Utc>,
37+
/// Item scoping.
38+
pub scoping: Scoping,
39+
/// Storage retention in days.
40+
pub retention: Option<u16>,
41+
}
42+
43+
pub fn convert(log: Annotated<OurLog>, ctx: &Context) -> Result<StoreLog> {
44+
let quantities = log.quantities();
45+
46+
let log = required!(log);
47+
let timestamp = required!(log.timestamp);
48+
let attrs = log.attributes.0.unwrap_or_default();
49+
50+
let trace_item = TraceItem {
51+
item_type: TraceItemType::Log.into(),
52+
organization_id: ctx.scoping.organization_id.value(),
53+
project_id: ctx.scoping.project_id.value(),
54+
received: Some(ts(ctx.received_at)),
55+
retention_days: ctx.retention.unwrap_or(DEFAULT_EVENT_RETENTION).into(),
56+
timestamp: Some(ts(timestamp.0)),
57+
trace_id: required!(log.trace_id).to_string(),
58+
item_id: Uuid::new_v7(timestamp.into()).as_bytes().to_vec(),
59+
attributes: attributes(attrs),
60+
client_sample_rate: 1.0,
61+
server_sample_rate: 1.0,
62+
};
63+
64+
Ok(StoreLog {
65+
trace_item,
66+
quantities,
67+
})
68+
}
69+
70+
fn ts(dt: DateTime<Utc>) -> Timestamp {
71+
Timestamp {
72+
seconds: dt.timestamp(),
73+
nanos: i32::try_from(dt.timestamp_subsec_nanos()).unwrap_or(0),
74+
}
75+
}
76+
77+
fn attributes(attributes: Attributes) -> HashMap<String, AnyValue> {
78+
let mut result = HashMap::with_capacity(attributes.0.len());
79+
80+
for (name, attribute) in attributes {
81+
let value = attribute
82+
.into_value()
83+
.and_then(|v| v.value.value.into_value());
84+
85+
let Some(value) = value else {
86+
// Emit `_meta` attributes here with #4804.
87+
continue;
88+
};
89+
90+
let Some(value) = (match value {
91+
Value::Bool(v) => Some(any_value::Value::BoolValue(v)),
92+
Value::I64(v) => Some(any_value::Value::IntValue(v)),
93+
Value::U64(v) => i64::try_from(v).ok().map(any_value::Value::IntValue),
94+
Value::F64(v) => Some(any_value::Value::DoubleValue(v)),
95+
Value::String(v) => Some(any_value::Value::StringValue(v)),
96+
// These cases do not happen, as they are not valid attributes
97+
// and they should have been filtered out before already.
98+
Value::Array(_) | Value::Object(_) => {
99+
debug_assert!(false, "unsupported log value");
100+
None
101+
}
102+
}) else {
103+
continue;
104+
};
105+
106+
result.insert(name, AnyValue { value: Some(value) });
107+
}
108+
109+
result
110+
}

0 commit comments

Comments
 (0)