diff --git a/extensions/resteasy-classic/resteasy-multipart/deployment/src/test/java/io/quarkus/resteasy/multipart/LargeMultipartPayloadTest.java b/extensions/resteasy-classic/resteasy-multipart/deployment/src/test/java/io/quarkus/resteasy/multipart/LargeMultipartPayloadTest.java new file mode 100644 index 0000000000000..3e1af60e4d00a --- /dev/null +++ b/extensions/resteasy-classic/resteasy-multipart/deployment/src/test/java/io/quarkus/resteasy/multipart/LargeMultipartPayloadTest.java @@ -0,0 +1,84 @@ +package io.quarkus.resteasy.multipart; + +import java.util.function.Supplier; + +import jakarta.annotation.Priority; +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.FormParam; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Priorities; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.container.ContainerRequestContext; +import jakarta.ws.rs.container.ContainerRequestFilter; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.ext.Provider; + +import org.jboss.resteasy.annotations.providers.multipart.MultipartForm; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.test.QuarkusUnitTest; +import io.restassured.RestAssured; + +public class LargeMultipartPayloadTest { + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .setArchiveProducer(new Supplier<>() { + @Override + public JavaArchive get() { + return ShrinkWrap.create(JavaArchive.class) + .addAsResource(new StringAsset(""" + quarkus.http.limits.max-body-size=30M + """), + "application.properties"); + } + }); + + @Test + public void testConnectionClosedOnException() { + RestAssured + .given() + .multiPart("content", twentyMegaBytes()) + .post("/test/multipart") + .then() + .statusCode(500); + } + + private static String twentyMegaBytes() { + return new String(new byte[20_000_000]); + } + + @Path("/test") + public static class Resource { + @POST + @Path("/multipart") + @Produces(MediaType.TEXT_PLAIN) + @Consumes(MediaType.MULTIPART_FORM_DATA) + public String postForm(@MultipartForm final FormBody ignored) { + return "ignored"; + } + } + + public static class FormBody { + + @FormParam("content") + public String content; + + } + + @Priority(Priorities.USER) + @Provider + public static class Filter implements ContainerRequestFilter { + + @Override + public void filter(ContainerRequestContext containerRequestContext) { + throw new RuntimeException("Expected exception"); + } + } + +} diff --git a/extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/multipart/LargeMultipartPayloadTest.java b/extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/multipart/LargeMultipartPayloadTest.java new file mode 100644 index 0000000000000..1fa96b827e066 --- /dev/null +++ b/extensions/resteasy-reactive/rest/deployment/src/test/java/io/quarkus/resteasy/reactive/server/test/multipart/LargeMultipartPayloadTest.java @@ -0,0 +1,105 @@ +package io.quarkus.resteasy.reactive.server.test.multipart; + +import static io.restassured.RestAssured.given; + +import java.util.List; +import java.util.Map; +import java.util.function.Supplier; + +import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.POST; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; + +import org.hamcrest.Matchers; +import org.jboss.jandex.ClassInfo; +import org.jboss.jandex.MethodInfo; +import org.jboss.resteasy.reactive.RestForm; +import org.jboss.resteasy.reactive.common.model.ResourceClass; +import org.jboss.resteasy.reactive.multipart.FileUpload; +import org.jboss.resteasy.reactive.server.ServerExceptionMapper; +import org.jboss.resteasy.reactive.server.core.ResteasyReactiveRequestContext; +import org.jboss.resteasy.reactive.server.model.HandlerChainCustomizer; +import org.jboss.resteasy.reactive.server.model.ServerResourceMethod; +import org.jboss.resteasy.reactive.server.processor.scanning.MethodScanner; +import org.jboss.resteasy.reactive.server.spi.ServerRestHandler; +import org.jboss.shrinkwrap.api.ShrinkWrap; +import org.jboss.shrinkwrap.api.asset.StringAsset; +import org.jboss.shrinkwrap.api.spec.JavaArchive; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.quarkus.resteasy.reactive.server.spi.MethodScannerBuildItem; +import io.quarkus.test.QuarkusUnitTest; +import io.smallrye.mutiny.Uni; + +public class LargeMultipartPayloadTest { + + @RegisterExtension + static QuarkusUnitTest test = new QuarkusUnitTest() + .setArchiveProducer(new Supplier<>() { + @Override + public JavaArchive get() { + return ShrinkWrap.create(JavaArchive.class) + .addAsResource(new StringAsset(""" + quarkus.http.limits.max-body-size=30M + """), + "application.properties"); + } + }).addBuildChainCustomizer(buildChainBuilder -> buildChainBuilder.addBuildStep(context -> context.produce( + new MethodScannerBuildItem(new MethodScanner() { + @Override + public List scan(MethodInfo method, ClassInfo actualEndpointClass, + Map methodContext) { + return List.of(new AlwaysFailHandler()); + } + }))).produces(MethodScannerBuildItem.class).build()); + + @Test + public void testConnectionClosedOnException() { + given() + .multiPart("file", twentyMegaBytes()) + .post("/test") + .then() + .statusCode(200) + .body(Matchers.is("Expected failure!")); + } + + private static String twentyMegaBytes() { + return new String(new byte[20_000_000]); + } + + @Path("/test") + public static class Resource { + + @POST + @Consumes(MediaType.MULTIPART_FORM_DATA) + public String uploadFile(@RestForm("file") FileUpload file) { + return "File " + file.fileName() + " uploaded!"; + } + } + + public static class Mappers { + + @ServerExceptionMapper(RuntimeException.class) + Uni handle() { + return Uni.createFrom().item(Response.status(200).entity("Expected failure!").build()); + } + + } + + public static class AlwaysFailHandler implements ServerRestHandler, HandlerChainCustomizer { + + @Override + public void handle(ResteasyReactiveRequestContext requestContext) throws Exception { + requestContext.suspend(); + requestContext.resume(new RuntimeException("Expected exception!"), true); + } + + @Override + public List handlers(Phase phase, ResourceClass resourceClass, ServerResourceMethod resourceMethod) { + return List.of(new AlwaysFailHandler()); + } + } +} diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/ResumingRequestWrapper.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/ResumingRequestWrapper.java index 4375618f3201d..9774cb7d4ca29 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/ResumingRequestWrapper.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/ResumingRequestWrapper.java @@ -1,18 +1,50 @@ package io.quarkus.vertx.http.runtime; +import io.quarkus.vertx.http.runtime.filters.AbstractResponseWrapper; import io.vertx.core.Handler; import io.vertx.core.buffer.Buffer; import io.vertx.core.http.HttpServerFileUpload; import io.vertx.core.http.HttpServerRequest; +import io.vertx.core.http.HttpServerResponse; import io.vertx.core.http.impl.HttpServerRequestInternal; import io.vertx.core.http.impl.HttpServerRequestWrapper; public class ResumingRequestWrapper extends HttpServerRequestWrapper { + private final HttpServerResponse httpServerResponse; private boolean userSetState; - public ResumingRequestWrapper(HttpServerRequest request) { + public ResumingRequestWrapper(HttpServerRequest request, boolean mustResumeRequest) { super((HttpServerRequestInternal) request); + + // TODO: replace this when more than one response end handlers are allowed + if (mustResumeRequest) { + HttpServerResponse response = delegate.response(); + response.endHandler(new Handler() { + @Override + public void handle(Void unused) { + if (!delegate.isEnded()) { + delegate.resume(); + } + } + }); + this.httpServerResponse = new AbstractResponseWrapper(response) { + @Override + public HttpServerResponse endHandler(Handler handler) { + return super.endHandler(new Handler() { + @Override + public void handle(Void unused) { + handler.handle(null); + if (!delegate.isEnded()) { + delegate.resume(); + } + } + }); + } + }; + } else { + this.httpServerResponse = null; + } } @Override @@ -71,4 +103,13 @@ public HttpServerRequest uploadHandler(Handler handler) { } return this; } + + @Override + public HttpServerResponse response() { + if (httpServerResponse != null) { + return httpServerResponse; + } else { + return super.response(); + } + } } diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java index 018b9f5a9e7bf..f260edf6ffa91 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/VertxHttpRecorder.java @@ -534,8 +534,9 @@ public void handle(HttpServerRequest event) { }; } + final boolean mustResumeRequest = httpConfiguration.limits.maxBodySize.isPresent(); Handler delegate = root; - root = HttpServerCommonHandlers.enforceDuplicatedContext(delegate); + root = HttpServerCommonHandlers.enforceDuplicatedContext(delegate, mustResumeRequest); if (httpConfiguration.recordRequestStartTime) { httpRouteRouter.route().order(RouteConstants.ROUTE_ORDER_RECORD_START_TIME).handler(new Handler() { @Override @@ -576,7 +577,7 @@ public void handle(RoutingContext event) { HttpServerCommonHandlers.applyHeaders(managementConfiguration.getValue().header, mr); applyCompression(managementBuildTimeConfig.enableCompression, mr); - Handler handler = HttpServerCommonHandlers.enforceDuplicatedContext(mr); + Handler handler = HttpServerCommonHandlers.enforceDuplicatedContext(mr, mustResumeRequest); handler = HttpServerCommonHandlers.applyProxy(managementConfiguration.getValue().proxy, handler, vertx); int routesBeforeMiEvent = mr.getRoutes().size(); diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/filters/AbstractResponseWrapper.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/filters/AbstractResponseWrapper.java index 7d87cbd15d2c1..fe2500ca859d7 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/filters/AbstractResponseWrapper.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/filters/AbstractResponseWrapper.java @@ -15,11 +15,11 @@ import io.vertx.core.net.HostAndPort; import io.vertx.core.streams.ReadStream; -class AbstractResponseWrapper implements HttpServerResponse { +public class AbstractResponseWrapper implements HttpServerResponse { private final HttpServerResponse delegate; - AbstractResponseWrapper(HttpServerResponse delegate) { + protected AbstractResponseWrapper(HttpServerResponse delegate) { this.delegate = delegate; } diff --git a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/options/HttpServerCommonHandlers.java b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/options/HttpServerCommonHandlers.java index d14da39ac12cf..3d0468aba51cc 100644 --- a/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/options/HttpServerCommonHandlers.java +++ b/extensions/vertx-http/runtime/src/main/java/io/quarkus/vertx/http/runtime/options/HttpServerCommonHandlers.java @@ -65,7 +65,8 @@ public void handle(Void e) { } } - public static Handler enforceDuplicatedContext(Handler delegate) { + public static Handler enforceDuplicatedContext(Handler delegate, + boolean mustResumeRequest) { return new Handler() { @Override public void handle(HttpServerRequest event) { @@ -78,12 +79,12 @@ public void handle(HttpServerRequest event) { @Override public void handle(Void x) { setCurrentContextSafe(true); - delegate.handle(new ResumingRequestWrapper(event)); + delegate.handle(new ResumingRequestWrapper(event, mustResumeRequest)); } }); } else { setCurrentContextSafe(true); - delegate.handle(new ResumingRequestWrapper(event)); + delegate.handle(new ResumingRequestWrapper(event, mustResumeRequest)); } } };