Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions relay-server/src/endpoints/attachments.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use axum::extract::Path;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use multer::Multipart;
use relay_config::Config;
use relay_event_schema::protocol::EventId;
use serde::Deserialize;

Expand All @@ -20,8 +21,10 @@ async fn extract_envelope(
meta: RequestMeta,
path: AttachmentPath,
multipart: Multipart<'static>,
config: &Config,
) -> Result<Box<Envelope>, BadStoreRequest> {
let items = utils::multipart_items(multipart, |_, _| AttachmentType::default()).await?;
let items =
utils::multipart_items(multipart, |_, _| AttachmentType::default(), config, false).await?;

let mut envelope = Envelope::from_request(Some(path.event_id), meta);
for item in items {
Expand All @@ -37,7 +40,7 @@ pub async fn handle(
Path(path): Path<AttachmentPath>,
Remote(multipart): Remote<Multipart<'static>>,
) -> Result<impl IntoResponse, BadStoreRequest> {
let envelope = extract_envelope(meta, path, multipart).await?;
let envelope = extract_envelope(meta, path, multipart, state.config()).await?;
common::handle_envelope(&state, envelope).await?;
Ok(StatusCode::CREATED)
}
7 changes: 4 additions & 3 deletions relay-server/src/endpoints/minidump.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,9 @@ async fn extract_embedded_minidump(payload: Bytes) -> Result<Option<Bytes>, BadS
async fn extract_multipart(
multipart: Multipart<'static>,
meta: RequestMeta,
config: &Config,
) -> Result<Box<Envelope>, BadStoreRequest> {
let mut items = utils::multipart_items(multipart, infer_attachment_type).await?;
let mut items = utils::multipart_items(multipart, infer_attachment_type, config, false).await?;

let minidump_item = items
.iter_mut()
Expand Down Expand Up @@ -226,7 +227,7 @@ async fn handle(
extract_raw_minidump(request.extract().await?, meta)?
} else {
let Remote(multipart) = request.extract_with_state(&state).await?;
extract_multipart(multipart, meta).await?
extract_multipart(multipart, meta, state.config()).await?
};

let id = envelope.event_id();
Expand Down Expand Up @@ -395,7 +396,7 @@ mod tests {
let config = Config::default();

let multipart = utils::multipart_from_request(request, &config)?;
let items = multipart_items(multipart, infer_attachment_type).await?;
let items = multipart_items(multipart, infer_attachment_type, &config, false).await?;

// we expect the multipart body to contain
// * one arbitrary attachment from the user (a `config.json`)
Expand Down
7 changes: 4 additions & 3 deletions relay-server/src/endpoints/playstation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,9 @@ fn infer_attachment_type(_field_name: Option<&str>, file_name: &str) -> Attachme
async fn extract_multipart(
multipart: Multipart<'static>,
meta: RequestMeta,
config: &Config,
) -> Result<Box<Envelope>, BadStoreRequest> {
let mut items = utils::multipart_items(multipart, infer_attachment_type).await?;
let mut items = utils::multipart_items(multipart, infer_attachment_type, config, true).await?;

let prosperodump_item = items
.iter_mut()
Expand Down Expand Up @@ -73,7 +74,7 @@ async fn handle(
) -> axum::response::Result<impl IntoResponse> {
// The crash dumps are transmitted as `...` in a multipart form-data/ request.
let Remote(multipart) = request.extract_with_state(&state).await?;
let mut envelope = extract_multipart(multipart, meta).await?;
let mut envelope = extract_multipart(multipart, meta, state.config()).await?;
envelope.require_feature(Feature::PlaystationIngestion);

let id = envelope.event_id();
Expand All @@ -89,5 +90,5 @@ async fn handle(
}

pub fn route(config: &Config) -> MethodRouter<ServiceState> {
post(handle).route_layer(DefaultBodyLimit::max(config.max_attachment_size()))
post(handle).route_layer(DefaultBodyLimit::max(config.max_attachments_size()))
}
174 changes: 165 additions & 9 deletions relay-server/src/utils/multipart.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
use std::io;
use std::task::Poll;

use axum::extract::Request;
use multer::Multipart;
use bytes::{Bytes, BytesMut};
use futures::{StreamExt, TryStreamExt};
use multer::{Field, Multipart};
use relay_config::Config;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -152,6 +155,8 @@ pub fn get_multipart_boundary(data: &[u8]) -> Option<&str> {
pub async fn multipart_items<F>(
mut multipart: Multipart<'_>,
mut infer_type: F,
config: &Config,
ignore_large_fields: bool,
) -> Result<Items, multer::Error>
where
F: FnMut(Option<&str>, &str) -> AttachmentType,
Expand All @@ -164,12 +169,21 @@ where
let mut item = Item::new(ItemType::Attachment);
item.set_attachment_type(infer_type(field.name(), file_name));
item.set_filename(file_name);
// Extract the body after the immutable borrow on `file_name` is gone.
if let Some(content_type) = field.content_type() {
item.set_payload(content_type.as_ref().into(), field.bytes().await?);
} else {
item.set_payload_without_content_type(field.bytes().await?);

let content_type = field.content_type().cloned();
let field = LimitedField::new(field, config.max_attachment_size());
match field.bytes().await {
Err(multer::Error::FieldSizeExceeded { .. }) if ignore_large_fields => continue,
Err(err) => return Err(err),
Ok(bytes) => {
if let Some(content_type) = content_type {
item.set_payload(content_type.as_ref().into(), bytes);
} else {
item.set_payload_without_content_type(bytes);
}
}
}

items.push(item);
} else if let Some(field_name) = field.name().map(str::to_owned) {
// Ensure to decode this SAFELY to match Django's POST data behavior. This allows us to
Expand All @@ -193,6 +207,76 @@ where
Ok(items)
}

/// Wrapper around `multer::Field` which consumes the entire underlying stream even when the
/// size limit is exceeded.
///
/// The idea being that you can process fields in a multi-part form even if one fields is too large.
struct LimitedField<'a> {
field: Field<'a>,
consumed_size: usize,
size_limit: usize,
inner_finished: bool,
}

impl<'a> LimitedField<'a> {
fn new(field: Field<'a>, limit: usize) -> Self {
LimitedField {
field,
consumed_size: 0,
size_limit: limit,
inner_finished: false,
}
}

async fn bytes(self) -> Result<Bytes, multer::Error> {
self.try_fold(BytesMut::new(), |mut acc, x| async move {
acc.extend_from_slice(&x);
Ok(acc)
})
.await
.map(|x| x.freeze())
}
}

impl futures::Stream for LimitedField<'_> {
type Item = Result<Bytes, multer::Error>;

fn poll_next(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<Self::Item>> {
if self.inner_finished {
return Poll::Ready(None);
}

match self.field.poll_next_unpin(cx) {
err @ Poll::Ready(Some(Err(_))) => err,
Poll::Ready(Some(Ok(t))) => {
self.consumed_size += t.len();
match self.consumed_size <= self.size_limit {
true => Poll::Ready(Some(Ok(t))),
false => {
cx.waker().wake_by_ref();
Poll::Pending
}
}
}
Poll::Ready(None) if self.consumed_size > self.size_limit => {
self.inner_finished = true;
Poll::Ready(Some(Err(multer::Error::FieldSizeExceeded {
limit: self.size_limit as u64,
field_name: self.field.name().map(Into::into),
})))
}
Poll::Ready(None) => {
self.inner_finished = true;
Poll::Ready(None)
}
Poll::Pending => Poll::Pending,
}
}
}

pub fn multipart_from_request(
request: Request,
config: &Config,
Expand All @@ -204,9 +288,8 @@ pub fn multipart_from_request(
.unwrap_or("");
let boundary = multer::parse_boundary(content_type)?;

let limits = multer::SizeLimit::new()
.whole_stream(config.max_attachments_size() as u64)
.per_field(config.max_attachment_size() as u64);
// Only enforce the stream limit here as the `per_field` limit is enforced by `LimitedField`.
let limits = multer::SizeLimit::new().whole_stream(config.max_attachments_size() as u64);

Ok(Multipart::with_constraints(
request.into_body().into_data_stream(),
Expand Down Expand Up @@ -287,4 +370,77 @@ mod tests {

Ok(())
}

#[tokio::test]
async fn test_individual_size_limit_exceeded() -> anyhow::Result<()> {
let data = "--X-BOUNDARY\r\n\
Content-Disposition: form-data; name=\"file\"; filename=\"large.txt\"\r\n\
Content-Type: text/plain\r\n\
\r\n\
content too large for limit\r\n\
--X-BOUNDARY\r\n\
Content-Disposition: form-data; name=\"small_file\"; filename=\"small.txt\"\r\n\
Content-Type: text/plain\r\n\
\r\n\
ok\r\n\
--X-BOUNDARY--\r\n";

let stream = futures::stream::once(async { Ok::<_, Infallible>(data) });
let multipart = Multipart::new(stream, "X-BOUNDARY");

let config = Config::from_json_value(serde_json::json!({
"limits": {
"max_attachment_size": 5
}
}))?;

let items =
multipart_items(multipart, |_, _| AttachmentType::Attachment, &config, true).await?;

// The large field is skipped so only the small one should make it through.
assert_eq!(items.len(), 1);
let item = &items[0];
assert_eq!(item.filename(), Some("small.txt"));
assert_eq!(item.payload(), Bytes::from("ok"));

Ok(())
}

#[tokio::test]
async fn test_collective_size_limit_exceeded() -> anyhow::Result<()> {
let data = "--X-BOUNDARY\r\n\
Content-Disposition: form-data; name=\"file\"; filename=\"large.txt\"\r\n\
Content-Type: text/plain\r\n\
\r\n\
content too large for limit\r\n\
--X-BOUNDARY\r\n\
Content-Disposition: form-data; name=\"small_file\"; filename=\"small.txt\"\r\n\
Content-Type: text/plain\r\n\
\r\n\
ok\r\n\
--X-BOUNDARY--\r\n";

let stream = futures::stream::once(async { Ok::<_, Infallible>(data) });

let config = Config::from_json_value(serde_json::json!({
"limits": {
"max_attachments_size": 5
}
}))?;
let limits = multer::SizeLimit::new().whole_stream(config.max_attachments_size() as u64);

let multipart = Multipart::with_constraints(
stream,
"X-BOUNDARY",
multer::Constraints::new().size_limit(limits),
);

let result =
multipart_items(multipart, |_, _| AttachmentType::Attachment, &config, true).await;

// Should be warned if the overall stream limit is being breached.
assert!(result.is_err_and(|x| matches!(x, multer::Error::StreamSizeExceeded { limit: _ })));

Ok(())
}
}
2 changes: 1 addition & 1 deletion tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
get_topic_name,
processing_config,
relay_with_processing,
relay_with_playstation,
relay_processing_with_playstation,
consumer_fixture,
events_consumer,
outcomes_consumer,
Expand Down
30 changes: 22 additions & 8 deletions tests/integration/fixtures/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -423,23 +423,37 @@ def send_unreal_request(self, project_id, file_content, dsn_key_idx=0):
response.raise_for_status()
return response

def send_playstation_request(self, project_id, file_content, dsn_key_idx=0):
def send_playstation_request(
self,
project_id,
crash_file_content,
crash_video_content=None,
dsn_key_idx=0,
):
"""
Sends a request to the playstation endpoint
:param project_id: the project id
:param file_content: the unreal file content
"""
files = {}
if crash_video_content:
files["upload_file_crash_video"] = (
"crash-video.webm",
crash_video_content,
"video/webm",
)

files["upload_file_minidump"] = (
"playstation.prosperodmp",
crash_file_content,
"application/octet-stream",
)

response = self.post(
"/api/{}/playstation/?sentry_key={}".format(
project_id, self.get_dsn_public_key(project_id, dsn_key_idx)
),
files={
"upload_file_minidump": (
"playstation.prosperodmp",
file_content,
"application/octet-stream",
)
},
files=files,
)

response.raise_for_status()
Expand Down
17 changes: 16 additions & 1 deletion tests/integration/fixtures/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def inner(options=None, **kwargs):


@pytest.fixture
def relay_with_playstation(relay_with_processing):
def relay_processing_with_playstation(relay_with_processing):
"""
Skips the test if Relay is not compiled with Playstation support else behaves like
`relay_with_processing`
Expand All @@ -111,6 +111,21 @@ def relay_with_playstation(relay_with_processing):
return relay_with_processing


@pytest.fixture
def relay_with_playstation(mini_sentry, relay):
"""
Skips the test if Relay is not compiled with Playstation support else behaves like `relay`
"""
internal_relay = relay(mini_sentry)
try:
response = internal_relay.post("/api/42/playstation/")
if response.status_code == 404:
pytest.skip("Test requires Relay compiled with PlayStation support")
except Exception:
pytest.skip("Test requires Relay compiled with PlayStation support")
return relay


def kafka_producer(options):
# look for the servers (it is the only config we are interested in)
servers = [
Expand Down
Loading
Loading