Skip to content

Commit 6778f58

Browse files
committed
Support direct reply-to in RPC client support class
References rabbitmq/rabbitmq-server#14474 Conflicts: src/test/java/com/rabbitmq/client/amqp/impl/ClientTest.java
1 parent d6ace80 commit 6778f58

File tree

9 files changed

+208
-72
lines changed

9 files changed

+208
-72
lines changed

.github/workflows/test-pr.yml

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Test against RabbitMQ 4.1 (PR)
1+
name: Test against RabbitMQ stable (PR)
22

33
on:
44
pull_request:
@@ -23,6 +23,8 @@ jobs:
2323
cache: 'maven'
2424
- name: Start broker
2525
run: ci/start-broker.sh
26+
env:
27+
RABBITMQ_IMAGE: pivotalrabbitmq/rabbitmq:pr-14474-otp28
2628
- name: Start toxiproxy
2729
run: ci/start-toxiproxy.sh
2830
- name: Display Java version

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
name: Test against RabbitMQ 4.0
1+
name: Test against RabbitMQ stable
22

33
on:
44
push:

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConnection.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import static com.rabbitmq.client.amqp.Resource.State.RECOVERING;
2525
import static com.rabbitmq.client.amqp.impl.ExceptionUtils.convert;
2626
import static com.rabbitmq.client.amqp.impl.Tuples.pair;
27+
import static com.rabbitmq.client.amqp.impl.Utils.supportDirectReplyTo;
2728
import static com.rabbitmq.client.amqp.impl.Utils.supportFilterExpressions;
2829
import static com.rabbitmq.client.amqp.impl.Utils.supportSetToken;
2930
import static com.rabbitmq.client.amqp.impl.Utils.supportSqlFilterExpressions;
@@ -118,7 +119,8 @@ final class AmqpConnection extends ResourceBase implements Connection {
118119
private final Lock instanceLock = new ReentrantLock();
119120
private final boolean filterExpressionsSupported,
120121
setTokenSupported,
121-
sqlFilterExpressionsSupported;
122+
sqlFilterExpressionsSupported,
123+
directReplyToSupported;
122124
private volatile ConsumerWorkService consumerWorkService;
123125
private volatile Executor dispatchingExecutor;
124126
private final boolean privateDispatchingExecutor;
@@ -216,6 +218,7 @@ final class AmqpConnection extends ResourceBase implements Connection {
216218
this.filterExpressionsSupported = supportFilterExpressions(brokerVersion);
217219
this.setTokenSupported = supportSetToken(brokerVersion);
218220
this.sqlFilterExpressionsSupported = supportSqlFilterExpressions(brokerVersion);
221+
this.directReplyToSupported = supportDirectReplyTo(brokerVersion);
219222
LOGGER.debug("Opened connection '{}' on node '{}'.", this.name(), this.connectionNodename());
220223
this.state(OPEN);
221224
this.environment.metricsCollector().openConnection();
@@ -868,6 +871,12 @@ boolean setTokenSupported() {
868871
return this.setTokenSupported;
869872
}
870873

874+
boolean directReplyToSupported() {
875+
// TODO use flag when version for direct reply-to is known
876+
return true;
877+
// return this.directReplyToSupported;
878+
}
879+
871880
long id() {
872881
return this.id;
873882
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumer.java

Lines changed: 60 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
6565
private final int initialCredits;
6666
private final Long id;
6767
private final String address;
68+
private volatile String directReplyToAddress;
6869
private final String queue;
6970
private final Map<String, DescribedType> filters;
7071
private final Map<String, Object> linkProperties;
@@ -96,10 +97,15 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
9697
.connection()
9798
.observationCollector()
9899
.subscribe(builder.queue(), builder.messageHandler());
99-
DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
100-
addressBuilder.queue(builder.queue());
101-
this.address = addressBuilder.address();
102-
this.queue = builder.queue();
100+
if (builder.directReplyTo()) {
101+
this.address = null;
102+
this.queue = null;
103+
} else {
104+
DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
105+
addressBuilder.queue(builder.queue());
106+
this.address = addressBuilder.address();
107+
this.queue = builder.queue();
108+
}
103109
this.filters = Map.copyOf(builder.filters());
104110
this.linkProperties = Map.copyOf(builder.properties());
105111
this.subscriptionListener =
@@ -120,18 +126,19 @@ final class AmqpConsumer extends ResourceBase implements Consumer {
120126
this.consumerWorkService = connection.consumerWorkService();
121127
this.consumerWorkService.register(this);
122128
this.nativeReceiver =
123-
this.createNativeReceiver(
129+
createNativeReceiver(
124130
this.sessionHandler.session(),
125131
this.address,
126132
this.linkProperties,
127133
this.filters,
128134
this.subscriptionListener,
129135
this.nativeHandler,
130136
this.nativeCloseHandler);
131-
this.initStateFromNativeReceiver(this.nativeReceiver);
132-
this.metricsCollector = this.connection.metricsCollector();
133-
this.state(OPEN);
134137
try {
138+
this.directReplyToAddress = nativeReceiver.address();
139+
this.initStateFromNativeReceiver(this.nativeReceiver);
140+
this.metricsCollector = this.connection.metricsCollector();
141+
this.state(OPEN);
135142
this.nativeReceiver.addCredit(this.initialCredits);
136143
} catch (ClientException e) {
137144
AmqpException ex = ExceptionUtils.convert(e);
@@ -189,7 +196,7 @@ public void close() {
189196

190197
// internal API
191198

192-
private ClientReceiver createNativeReceiver(
199+
private static ClientReceiver createNativeReceiver(
193200
Session nativeSession,
194201
String address,
195202
Map<String, Object> properties,
@@ -201,24 +208,47 @@ private ClientReceiver createNativeReceiver(
201208
filters = new LinkedHashMap<>(filters);
202209
StreamOptions streamOptions = AmqpConsumerBuilder.streamOptions(filters);
203210
subscriptionListener.preSubscribe(() -> streamOptions);
204-
ReceiverOptions receiverOptions =
205-
new ReceiverOptions()
206-
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
207-
.autoAccept(false)
208-
.autoSettle(false)
209-
.handler(nativeHandler)
210-
.closeHandler(closeHandler)
211-
.creditWindow(0)
212-
.properties(properties);
211+
boolean directReplyTo = address == null;
212+
ReceiverOptions receiverOptions = new ReceiverOptions();
213+
214+
if (directReplyTo) {
215+
receiverOptions
216+
.deliveryMode(DeliveryMode.AT_MOST_ONCE)
217+
.autoAccept(true)
218+
.autoSettle(true)
219+
.sourceOptions()
220+
.capabilities("rabbitmq:volatile-queue")
221+
.expiryPolicy(ExpiryPolicy.LINK_CLOSE)
222+
.durabilityMode(DurabilityMode.NONE);
223+
} else {
224+
receiverOptions
225+
.deliveryMode(DeliveryMode.AT_LEAST_ONCE)
226+
.autoAccept(false)
227+
.autoSettle(false);
228+
}
229+
receiverOptions
230+
.handler(nativeHandler)
231+
.closeHandler(closeHandler)
232+
.creditWindow(0)
233+
.properties(properties);
213234
Map<String, Object> localSourceFilters = Collections.emptyMap();
214235
if (!filters.isEmpty()) {
215236
localSourceFilters = Map.copyOf(filters);
216237
receiverOptions.sourceOptions().filters(localSourceFilters);
217238
}
218-
ClientReceiver receiver =
219-
(ClientReceiver)
220-
ExceptionUtils.wrapGet(
221-
nativeSession.openReceiver(address, receiverOptions).openFuture());
239+
ClientReceiver receiver;
240+
if (directReplyTo) {
241+
receiver =
242+
(ClientReceiver)
243+
ExceptionUtils.wrapGet(
244+
nativeSession.openDynamicReceiver(receiverOptions).openFuture());
245+
} else {
246+
receiver =
247+
(ClientReceiver)
248+
ExceptionUtils.wrapGet(
249+
nativeSession.openReceiver(address, receiverOptions).openFuture());
250+
}
251+
222252
boolean filterOk = true;
223253
if (!filters.isEmpty()) {
224254
Map<String, String> remoteSourceFilters = receiver.source().filters();
@@ -298,10 +328,11 @@ void recoverAfterConnectionFailure() {
298328
List.of(ofSeconds(1), ofSeconds(2), ofSeconds(3), BackOffDelayPolicy.TIMEOUT),
299329
"Create AMQP receiver to address '%s'",
300330
this.address);
301-
this.initStateFromNativeReceiver(this.nativeReceiver);
302-
this.pauseStatus.set(PauseStatus.UNPAUSED);
303-
this.unsettledMessageCount.set(0);
304331
try {
332+
this.directReplyToAddress = this.nativeReceiver.address();
333+
this.initStateFromNativeReceiver(this.nativeReceiver);
334+
this.pauseStatus.set(PauseStatus.UNPAUSED);
335+
this.unsettledMessageCount.set(0);
305336
this.nativeReceiver.addCredit(this.initialCredits);
306337
} catch (ClientException e) {
307338
throw ExceptionUtils.convert(e);
@@ -493,6 +524,10 @@ private void settle(
493524
}
494525
}
495526

527+
String directReplyToAddress() {
528+
return this.directReplyToAddress;
529+
}
530+
496531
@Override
497532
public String toString() {
498533
return "AmqpConsumer{" + "id=" + id + ", queue='" + queue + '\'' + '}';

src/main/java/com/rabbitmq/client/amqp/impl/AmqpConsumerBuilder.java

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ class AmqpConsumerBuilder implements ConsumerBuilder {
3333

3434
private final AmqpConnection connection;
3535
private String queue;
36+
private boolean directReplyTo = false;
3637
private Consumer.MessageHandler messageHandler;
3738
private int initialCredits = 100;
3839
private final List<Resource.StateListener> listeners = new ArrayList<>();
@@ -48,6 +49,19 @@ class AmqpConsumerBuilder implements ConsumerBuilder {
4849
@Override
4950
public ConsumerBuilder queue(String queue) {
5051
this.queue = queue;
52+
if (this.queue == null) {
53+
this.directReplyTo = true;
54+
} else {
55+
this.directReplyTo = false;
56+
}
57+
return this;
58+
}
59+
60+
ConsumerBuilder directReplyTo(boolean directReplyTo) {
61+
this.directReplyTo = directReplyTo;
62+
if (this.directReplyTo) {
63+
this.queue = null;
64+
}
5165
return this;
5266
}
5367

@@ -102,6 +116,10 @@ String queue() {
102116
return queue;
103117
}
104118

119+
boolean directReplyTo() {
120+
return this.directReplyTo;
121+
}
122+
105123
Consumer.MessageHandler messageHandler() {
106124
return messageHandler;
107125
}
@@ -124,7 +142,7 @@ Map<String, DescribedType> filters() {
124142

125143
@Override
126144
public Consumer build() {
127-
if (this.queue == null || this.queue.isBlank()) {
145+
if ((this.queue == null || this.queue.isBlank()) && !this.directReplyTo) {
128146
throw new IllegalArgumentException("A queue must be specified");
129147
}
130148
if (this.messageHandler == null) {
@@ -442,7 +460,7 @@ public StreamFilterOptions propertySymbol(String key, String value) {
442460

443461
@Override
444462
public StreamFilterOptions sql(String sql) {
445-
if (!this.streamOptions.builder.connection.filterExpressionsSupported()) {
463+
if (!this.streamOptions.builder.connection.sqlFilterExpressionsSupported()) {
446464
throw new IllegalArgumentException(
447465
"AMQP SQL filter expressions requires at least RabbitMQ 4.2.0");
448466
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcClient.java

Lines changed: 45 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
package com.rabbitmq.client.amqp.impl;
1919

2020
import com.rabbitmq.client.amqp.AmqpException;
21-
import com.rabbitmq.client.amqp.Consumer;
2221
import com.rabbitmq.client.amqp.Management;
2322
import com.rabbitmq.client.amqp.Message;
2423
import com.rabbitmq.client.amqp.Publisher;
@@ -39,7 +38,7 @@
3938
import org.slf4j.Logger;
4039
import org.slf4j.LoggerFactory;
4140

42-
class AmqpRpcClient implements RpcClient {
41+
final class AmqpRpcClient implements RpcClient {
4342

4443
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpRpcClient.class);
4544

@@ -48,7 +47,7 @@ class AmqpRpcClient implements RpcClient {
4847
private final AmqpConnection connection;
4948
private final Clock clock;
5049
private final Publisher publisher;
51-
private final Consumer consumer;
50+
private final AmqpConsumer consumer;
5251
private final Map<Object, OutstandingRequest> outstandingRequests = new ConcurrentHashMap<>();
5352
private final Supplier<Object> correlationIdSupplier;
5453
private final BiFunction<Message, Object, Message> requestPostProcessor;
@@ -67,30 +66,38 @@ class AmqpRpcClient implements RpcClient {
6766
this.publisher = publisherBuilder.build();
6867

6968
String replyTo = builder.replyToQueue();
69+
boolean directReplyTo;
7070
if (replyTo == null) {
71-
Management.QueueInfo queueInfo =
72-
this.connection.management().queue().exclusive(true).autoDelete(true).declare();
73-
replyTo = queueInfo.name();
71+
directReplyTo = connection.directReplyToSupported();
72+
if (!directReplyTo) {
73+
Management.QueueInfo queueInfo =
74+
this.connection.management().queue().exclusive(true).autoDelete(true).declare();
75+
replyTo = queueInfo.name();
76+
}
77+
} else {
78+
directReplyTo = false;
7479
}
7580
if (builder.correlationIdExtractor() == null) {
7681
this.correlationIdExtractor = Message::correlationId;
7782
} else {
7883
this.correlationIdExtractor = builder.correlationIdExtractor();
7984
}
85+
AmqpConsumerBuilder consumerBuilder = (AmqpConsumerBuilder) this.connection.consumerBuilder();
8086
this.consumer =
81-
this.connection
82-
.consumerBuilder()
83-
.queue(replyTo)
84-
.messageHandler(
85-
(ctx, msg) -> {
86-
ctx.accept();
87-
OutstandingRequest request =
88-
this.outstandingRequests.remove(this.correlationIdExtractor.apply(msg));
89-
if (request != null) {
90-
request.future.complete(msg);
91-
}
92-
})
93-
.build();
87+
(AmqpConsumer)
88+
consumerBuilder
89+
.directReplyTo(directReplyTo)
90+
.queue(replyTo)
91+
.messageHandler(
92+
(ctx, msg) -> {
93+
ctx.accept();
94+
OutstandingRequest request =
95+
this.outstandingRequests.remove(this.correlationIdExtractor.apply(msg));
96+
if (request != null) {
97+
request.future.complete(msg);
98+
}
99+
})
100+
.build();
94101

95102
if (builder.correlationIdSupplier() == null) {
96103
String correlationIdPrefix = UUID.randomUUID().toString();
@@ -102,14 +109,25 @@ class AmqpRpcClient implements RpcClient {
102109
}
103110

104111
if (builder.requestPostProcessor() == null) {
105-
DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
106-
addressBuilder.queue(replyTo);
107-
String replyToAddress = addressBuilder.address();
108-
// HTTP over AMQP 1.0 extension specification, 5.1:
109-
// To associate a response with a request, the correlation-id value of the response properties
110-
// MUST be set to the message-id value of the request properties.
111-
this.requestPostProcessor =
112-
(request, correlationId) -> request.replyTo(replyToAddress).messageId(correlationId);
112+
if (directReplyTo) {
113+
// HTTP over AMQP 1.0 extension specification, 5.1:
114+
// To associate a response with a request, the correlation-id value of the response
115+
// properties
116+
// MUST be set to the message-id value of the request properties.
117+
this.requestPostProcessor =
118+
(request, correlationId) ->
119+
request.replyTo(consumer.directReplyToAddress()).messageId(correlationId);
120+
} else {
121+
DefaultAddressBuilder<?> addressBuilder = Utils.addressBuilder();
122+
addressBuilder.queue(replyTo);
123+
String replyToAddress = addressBuilder.address();
124+
// HTTP over AMQP 1.0 extension specification, 5.1:
125+
// To associate a response with a request, the correlation-id value of the response
126+
// properties
127+
// MUST be set to the message-id value of the request properties.
128+
this.requestPostProcessor =
129+
(request, correlationId) -> request.replyTo(replyToAddress).messageId(correlationId);
130+
}
113131
} else {
114132
this.requestPostProcessor = builder.requestPostProcessor();
115133
}

src/main/java/com/rabbitmq/client/amqp/impl/AmqpRpcServer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@
3131
import org.slf4j.Logger;
3232
import org.slf4j.LoggerFactory;
3333

34-
class AmqpRpcServer implements RpcServer {
34+
final class AmqpRpcServer implements RpcServer {
3535

3636
private static final Logger LOGGER = LoggerFactory.getLogger(AmqpRpcServer.class);
3737

0 commit comments

Comments
 (0)