Skip to content

Commit 8169a80

Browse files
mkoubagsmet
authored andcommitted
WebSockets Next: cancel returned Multi if the connection is closed
- fixes #41025 (cherry picked from commit 787c9d6)
1 parent 8ea5fff commit 8169a80

File tree

2 files changed

+112
-26
lines changed

2 files changed

+112
-26
lines changed
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
package io.quarkus.websockets.next.test.errors;
2+
3+
import static org.junit.jupiter.api.Assertions.assertEquals;
4+
import static org.junit.jupiter.api.Assertions.assertTrue;
5+
6+
import java.net.URI;
7+
import java.time.Duration;
8+
import java.util.List;
9+
import java.util.concurrent.CopyOnWriteArrayList;
10+
import java.util.concurrent.CountDownLatch;
11+
import java.util.concurrent.TimeUnit;
12+
13+
import jakarta.inject.Inject;
14+
15+
import org.junit.jupiter.api.Test;
16+
import org.junit.jupiter.api.extension.RegisterExtension;
17+
18+
import io.quarkus.logging.Log;
19+
import io.quarkus.test.QuarkusUnitTest;
20+
import io.quarkus.test.common.http.TestHTTPResource;
21+
import io.quarkus.websockets.next.OnError;
22+
import io.quarkus.websockets.next.OnOpen;
23+
import io.quarkus.websockets.next.WebSocket;
24+
import io.quarkus.websockets.next.WebSocketConnection;
25+
import io.quarkus.websockets.next.test.utils.WSClient;
26+
import io.smallrye.mutiny.Multi;
27+
import io.vertx.core.Vertx;
28+
import io.vertx.core.impl.NoStackTraceThrowable;
29+
30+
public class MultiClosedConnectionTest {
31+
32+
@RegisterExtension
33+
public static final QuarkusUnitTest test = new QuarkusUnitTest()
34+
.withApplicationRoot(root -> {
35+
root.addClasses(Echo.class, WSClient.class);
36+
});
37+
38+
@Inject
39+
Vertx vertx;
40+
41+
@TestHTTPResource("echo")
42+
URI testUri;
43+
44+
@Test
45+
void testError() throws InterruptedException {
46+
WSClient client = WSClient.create(vertx).connect(testUri);
47+
client.waitForMessages(1);
48+
client.close();
49+
assertTrue(Echo.TERMINATION_LATCH.await(5, TimeUnit.SECONDS));
50+
assertTrue(Echo.ERROR_LATCH.await(5, TimeUnit.SECONDS));
51+
// Connection is closed and the returned Multi should be cancelled
52+
int numOfMessages = Echo.MESSAGES.size();
53+
Thread.sleep(600);
54+
// No more ticks are emitted
55+
assertEquals(numOfMessages, Echo.MESSAGES.size());
56+
}
57+
58+
@WebSocket(path = "/echo")
59+
public static class Echo {
60+
61+
static final CountDownLatch TERMINATION_LATCH = new CountDownLatch(1);
62+
static final CountDownLatch ERROR_LATCH = new CountDownLatch(1);
63+
64+
static final List<String> MESSAGES = new CopyOnWriteArrayList<>();
65+
66+
@OnOpen
67+
Multi<String> onOpen() {
68+
return Multi.createFrom()
69+
.ticks()
70+
.every(Duration.ofMillis(300))
71+
.map(tick -> tick + "")
72+
.invoke(s -> {
73+
Log.infof("Next tick: %s", s);
74+
MESSAGES.add(s);
75+
})
76+
.onTermination()
77+
.invoke(() -> {
78+
Log.info("Terminated!");
79+
TERMINATION_LATCH.countDown();
80+
});
81+
}
82+
83+
@OnError
84+
void onConnectionClosedError(NoStackTraceThrowable t, WebSocketConnection conn) {
85+
Log.info("Error callback!");
86+
if (conn.isClosed()) {
87+
String message = t.getMessage();
88+
if (message != null && message.contains("WebSocket is closed")) {
89+
ERROR_LATCH.countDown();
90+
}
91+
}
92+
}
93+
94+
}
95+
96+
}

extensions/websockets-next/runtime/src/main/java/io/quarkus/websockets/next/runtime/WebSocketEndpointBase.java

Lines changed: 16 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -319,41 +319,31 @@ public Uni<Void> sendText(String message, boolean broadcast) {
319319
return broadcast ? connection.broadcast().sendText(message) : connection.sendText(message);
320320
}
321321

322-
public Uni<Void> multiText(Multi<Object> multi, Function<Object, Uni<Void>> action) {
323-
multi.onFailure().recoverWithMulti(t -> doOnError(t).toMulti())
322+
public Uni<Void> multiText(Multi<Object> multi, Function<? super Object, Uni<?>> action) {
323+
multi
324+
// Encode and send message
325+
.onItem().call(action)
326+
.onFailure().recoverWithMulti(t -> {
327+
return doOnError(t).toMulti();
328+
})
324329
.subscribe().with(
325-
m -> {
326-
// Encode and send message
327-
action.apply(m)
328-
.onFailure().recoverWithUni(this::doOnError)
329-
.subscribe()
330-
.with(v -> LOG.debugf("Multi >> text message: %s", connection),
331-
t -> LOG.errorf(t, "Unable to send text message from Multi: %s", connection));
332-
},
333-
t -> {
334-
LOG.errorf(t, "Unable to send text message from Multi: %s ", connection);
335-
});
330+
m -> LOG.debugf("Multi >> text message: %s", connection),
331+
t -> LOG.errorf(t, "Unable to send text message from Multi: %s ", connection));
336332
return Uni.createFrom().voidItem();
337333
}
338334

339335
public Uni<Void> sendBinary(Buffer message, boolean broadcast) {
340336
return broadcast ? connection.broadcast().sendBinary(message) : connection.sendBinary(message);
341337
}
342338

343-
public Uni<Void> multiBinary(Multi<Object> multi, Function<Object, Uni<Void>> action) {
344-
multi.onFailure().recoverWithMulti(t -> doOnError(t).toMulti())
339+
public Uni<Void> multiBinary(Multi<Object> multi, Function<? super Object, Uni<?>> action) {
340+
multi
341+
// Encode and send message
342+
.onItem().call(action)
343+
.onFailure().recoverWithMulti(t -> doOnError(t).toMulti())
345344
.subscribe().with(
346-
m -> {
347-
// Encode and send message
348-
action.apply(m)
349-
.onFailure().recoverWithUni(this::doOnError)
350-
.subscribe()
351-
.with(v -> LOG.debugf("Multi >> binary message: %s", connection),
352-
t -> LOG.errorf(t, "Unable to send binary message from Multi: %s", connection));
353-
},
354-
t -> {
355-
LOG.errorf(t, "Unable to send text message from Multi: %s ", connection);
356-
});
345+
m -> LOG.debugf("Multi >> binary message: %s", connection),
346+
t -> LOG.errorf(t, "Unable to send binary message from Multi: %s ", connection));
357347
return Uni.createFrom().voidItem();
358348
}
359349
}

0 commit comments

Comments
 (0)