1
1
use std:: io;
2
+ use std:: task:: Poll ;
2
3
3
4
use axum:: extract:: Request ;
4
- use multer:: Multipart ;
5
+ use bytes:: { Bytes , BytesMut } ;
6
+ use futures:: { StreamExt , TryStreamExt } ;
7
+ use multer:: { Field , Multipart } ;
5
8
use relay_config:: Config ;
6
9
use serde:: { Deserialize , Serialize } ;
7
10
@@ -152,6 +155,8 @@ pub fn get_multipart_boundary(data: &[u8]) -> Option<&str> {
152
155
pub async fn multipart_items < F > (
153
156
mut multipart : Multipart < ' _ > ,
154
157
mut infer_type : F ,
158
+ config : & Config ,
159
+ ignore_large_fields : bool ,
155
160
) -> Result < Items , multer:: Error >
156
161
where
157
162
F : FnMut ( Option < & str > , & str ) -> AttachmentType ,
@@ -164,12 +169,21 @@ where
164
169
let mut item = Item :: new ( ItemType :: Attachment ) ;
165
170
item. set_attachment_type ( infer_type ( field. name ( ) , file_name) ) ;
166
171
item. set_filename ( file_name) ;
167
- // Extract the body after the immutable borrow on `file_name` is gone.
168
- if let Some ( content_type) = field. content_type ( ) {
169
- item. set_payload ( content_type. as_ref ( ) . into ( ) , field. bytes ( ) . await ?) ;
170
- } else {
171
- item. set_payload_without_content_type ( field. bytes ( ) . await ?) ;
172
+
173
+ let content_type = field. content_type ( ) . cloned ( ) ;
174
+ let field = LimitedField :: new ( field, config. max_attachment_size ( ) ) ;
175
+ match field. bytes ( ) . await {
176
+ Err ( multer:: Error :: FieldSizeExceeded { .. } ) if ignore_large_fields => continue ,
177
+ Err ( err) => return Err ( err) ,
178
+ Ok ( bytes) => {
179
+ if let Some ( content_type) = content_type {
180
+ item. set_payload ( content_type. as_ref ( ) . into ( ) , bytes) ;
181
+ } else {
182
+ item. set_payload_without_content_type ( bytes) ;
183
+ }
184
+ }
172
185
}
186
+
173
187
items. push ( item) ;
174
188
} else if let Some ( field_name) = field. name ( ) . map ( str:: to_owned) {
175
189
// Ensure to decode this SAFELY to match Django's POST data behavior. This allows us to
@@ -193,6 +207,76 @@ where
193
207
Ok ( items)
194
208
}
195
209
210
+ /// Wrapper around `multer::Field` which consumes the entire underlying stream even when the
211
+ /// size limit is exceeded.
212
+ ///
213
+ /// The idea being that you can process fields in a multi-part form even if one fields is too large.
214
+ struct LimitedField < ' a > {
215
+ field : Field < ' a > ,
216
+ consumed_size : usize ,
217
+ size_limit : usize ,
218
+ inner_finished : bool ,
219
+ }
220
+
221
+ impl < ' a > LimitedField < ' a > {
222
+ fn new ( field : Field < ' a > , limit : usize ) -> Self {
223
+ LimitedField {
224
+ field,
225
+ consumed_size : 0 ,
226
+ size_limit : limit,
227
+ inner_finished : false ,
228
+ }
229
+ }
230
+
231
+ async fn bytes ( self ) -> Result < Bytes , multer:: Error > {
232
+ self . try_fold ( BytesMut :: new ( ) , |mut acc, x| async move {
233
+ acc. extend_from_slice ( & x) ;
234
+ Ok ( acc)
235
+ } )
236
+ . await
237
+ . map ( |x| x. freeze ( ) )
238
+ }
239
+ }
240
+
241
+ impl futures:: Stream for LimitedField < ' _ > {
242
+ type Item = Result < Bytes , multer:: Error > ;
243
+
244
+ fn poll_next (
245
+ mut self : std:: pin:: Pin < & mut Self > ,
246
+ cx : & mut std:: task:: Context < ' _ > ,
247
+ ) -> std:: task:: Poll < Option < Self :: Item > > {
248
+ if self . inner_finished {
249
+ return Poll :: Ready ( None ) ;
250
+ }
251
+
252
+ match self . field . poll_next_unpin ( cx) {
253
+ err @ Poll :: Ready ( Some ( Err ( _) ) ) => err,
254
+ Poll :: Ready ( Some ( Ok ( t) ) ) => {
255
+ self . consumed_size += t. len ( ) ;
256
+ match self . consumed_size <= self . size_limit {
257
+ true => Poll :: Ready ( Some ( Ok ( t) ) ) ,
258
+ false => {
259
+ cx. waker ( ) . wake_by_ref ( ) ;
260
+ Poll :: Pending
261
+ }
262
+ }
263
+ }
264
+ Poll :: Ready ( None ) if self . consumed_size > self . size_limit => {
265
+ self . inner_finished = true ;
266
+ Poll :: Ready ( Some ( Err ( multer:: Error :: FieldSizeExceeded {
267
+ limit : self . size_limit as u64 ,
268
+ field_name : self . field . name ( ) . map ( Into :: into) ,
269
+ } ) ) )
270
+ }
271
+ Poll :: Ready ( None ) => {
272
+ self . inner_finished = true ;
273
+ Poll :: Ready ( None )
274
+ }
275
+ Poll :: Pending => Poll :: Pending ,
276
+ }
277
+ }
278
+ }
279
+
196
280
pub fn multipart_from_request (
197
281
request : Request ,
198
282
config : & Config ,
@@ -204,9 +288,8 @@ pub fn multipart_from_request(
204
288
. unwrap_or ( "" ) ;
205
289
let boundary = multer:: parse_boundary ( content_type) ?;
206
290
207
- let limits = multer:: SizeLimit :: new ( )
208
- . whole_stream ( config. max_attachments_size ( ) as u64 )
209
- . per_field ( config. max_attachment_size ( ) as u64 ) ;
291
+ // Only enforce the stream limit here as the `per_field` limit is enforced by `LimitedField`.
292
+ let limits = multer:: SizeLimit :: new ( ) . whole_stream ( config. max_attachments_size ( ) as u64 ) ;
210
293
211
294
Ok ( Multipart :: with_constraints (
212
295
request. into_body ( ) . into_data_stream ( ) ,
@@ -287,4 +370,77 @@ mod tests {
287
370
288
371
Ok ( ( ) )
289
372
}
373
+
374
+ #[ tokio:: test]
375
+ async fn test_individual_size_limit_exceeded ( ) -> anyhow:: Result < ( ) > {
376
+ let data = "--X-BOUNDARY\r \n \
377
+ Content-Disposition: form-data; name=\" file\" ; filename=\" large.txt\" \r \n \
378
+ Content-Type: text/plain\r \n \
379
+ \r \n \
380
+ content too large for limit\r \n \
381
+ --X-BOUNDARY\r \n \
382
+ Content-Disposition: form-data; name=\" small_file\" ; filename=\" small.txt\" \r \n \
383
+ Content-Type: text/plain\r \n \
384
+ \r \n \
385
+ ok\r \n \
386
+ --X-BOUNDARY--\r \n ";
387
+
388
+ let stream = futures:: stream:: once ( async { Ok :: < _ , Infallible > ( data) } ) ;
389
+ let multipart = Multipart :: new ( stream, "X-BOUNDARY" ) ;
390
+
391
+ let config = Config :: from_json_value ( serde_json:: json!( {
392
+ "limits" : {
393
+ "max_attachment_size" : 5
394
+ }
395
+ } ) ) ?;
396
+
397
+ let items =
398
+ multipart_items ( multipart, |_, _| AttachmentType :: Attachment , & config, true ) . await ?;
399
+
400
+ // The large field is skipped so only the small one should make it through.
401
+ assert_eq ! ( items. len( ) , 1 ) ;
402
+ let item = & items[ 0 ] ;
403
+ assert_eq ! ( item. filename( ) , Some ( "small.txt" ) ) ;
404
+ assert_eq ! ( item. payload( ) , Bytes :: from( "ok" ) ) ;
405
+
406
+ Ok ( ( ) )
407
+ }
408
+
409
+ #[ tokio:: test]
410
+ async fn test_collective_size_limit_exceeded ( ) -> anyhow:: Result < ( ) > {
411
+ let data = "--X-BOUNDARY\r \n \
412
+ Content-Disposition: form-data; name=\" file\" ; filename=\" large.txt\" \r \n \
413
+ Content-Type: text/plain\r \n \
414
+ \r \n \
415
+ content too large for limit\r \n \
416
+ --X-BOUNDARY\r \n \
417
+ Content-Disposition: form-data; name=\" small_file\" ; filename=\" small.txt\" \r \n \
418
+ Content-Type: text/plain\r \n \
419
+ \r \n \
420
+ ok\r \n \
421
+ --X-BOUNDARY--\r \n ";
422
+
423
+ let stream = futures:: stream:: once ( async { Ok :: < _ , Infallible > ( data) } ) ;
424
+
425
+ let config = Config :: from_json_value ( serde_json:: json!( {
426
+ "limits" : {
427
+ "max_attachments_size" : 5
428
+ }
429
+ } ) ) ?;
430
+ let limits = multer:: SizeLimit :: new ( ) . whole_stream ( config. max_attachments_size ( ) as u64 ) ;
431
+
432
+ let multipart = Multipart :: with_constraints (
433
+ stream,
434
+ "X-BOUNDARY" ,
435
+ multer:: Constraints :: new ( ) . size_limit ( limits) ,
436
+ ) ;
437
+
438
+ let result =
439
+ multipart_items ( multipart, |_, _| AttachmentType :: Attachment , & config, true ) . await ;
440
+
441
+ // Should be warned if the overall stream limit is being breached.
442
+ assert ! ( result. is_err_and( |x| matches!( x, multer:: Error :: StreamSizeExceeded { limit: _ } ) ) ) ;
443
+
444
+ Ok ( ( ) )
445
+ }
290
446
}
0 commit comments