Skip to content

Commit 0473f35

Browse files
Merge pull request #14474 from rabbitmq/amqp-direct-reply-to
Support Direct Reply-To for AMQP 1.0
2 parents 8111713 + 72cd7a3 commit 0473f35

35 files changed

+2083
-482
lines changed

.github/workflows/test-make-tests.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ jobs:
2828
- parallel-ct-set-2
2929
- parallel-ct-set-3
3030
- parallel-ct-set-4
31+
- parallel-ct-set-5
3132
- ct-amqp_client
3233
- ct-clustering_management
3334
- eunit ct-dead_lettering

deps/amqp10_client/src/amqp10_client_session.erl

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -92,8 +92,8 @@
9292
-type max_message_size() :: undefined | non_neg_integer().
9393
-type footer_opt() :: crc32 | adler32.
9494

95-
-type attach_args() :: #{name => binary(),
96-
role => attach_role(),
95+
-type attach_args() :: #{name := binary(),
96+
role := attach_role(),
9797
snd_settle_mode => snd_settle_mode(),
9898
rcv_settle_mode => rcv_settle_mode(),
9999
filter => filter(),
@@ -739,13 +739,19 @@ build_frames(Channel, Trf, Payload, MaxPayloadSize, Acc) ->
739739

740740
make_source(#{role := {sender, _}}) ->
741741
#'v1_0.source'{};
742-
make_source(#{role := {receiver, Source, _Pid},
743-
filter := Filter}) ->
742+
make_source(#{role := {receiver, Source, _Pid}} = AttachArgs) ->
744743
Durable = translate_terminus_durability(maps:get(durable, Source, none)),
744+
ExpiryPolicy = case Source of
745+
#{expiry_policy := Policy} when is_binary(Policy) ->
746+
{symbol, Policy};
747+
_ ->
748+
undefined
749+
end,
745750
Dynamic = maps:get(dynamic, Source, false),
746-
TranslatedFilter = translate_filters(Filter),
751+
TranslatedFilter = translate_filters(maps:get(filter, AttachArgs, #{})),
747752
#'v1_0.source'{address = make_address(Source),
748753
durable = {uint, Durable},
754+
expiry_policy = ExpiryPolicy,
749755
dynamic = Dynamic,
750756
filter = TranslatedFilter,
751757
capabilities = make_capabilities(Source)}.

deps/rabbit/Makefile

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -236,15 +236,19 @@ define ct_master.erl
236236
{ok, Pid2, _} = peer:start(StartOpts#{name => "rabbit_shard2"}),
237237
{ok, Pid3, _} = peer:start(StartOpts#{name => "rabbit_shard3"}),
238238
{ok, Pid4, _} = peer:start(StartOpts#{name => "rabbit_shard4"}),
239+
{ok, Pid5, _} = peer:start(StartOpts#{name => "rabbit_shard5"}),
239240
peer:call(Pid1, net_kernel, set_net_ticktime, [5]),
240241
peer:call(Pid2, net_kernel, set_net_ticktime, [5]),
241242
peer:call(Pid3, net_kernel, set_net_ticktime, [5]),
242243
peer:call(Pid4, net_kernel, set_net_ticktime, [5]),
244+
peer:call(Pid5, net_kernel, set_net_ticktime, [5]),
243245
peer:call(Pid1, persistent_term, put, [rabbit_ct_tcp_port_base, 16000]),
244246
peer:call(Pid2, persistent_term, put, [rabbit_ct_tcp_port_base, 20000]),
245247
peer:call(Pid3, persistent_term, put, [rabbit_ct_tcp_port_base, 24000]),
246248
peer:call(Pid4, persistent_term, put, [rabbit_ct_tcp_port_base, 28000]),
249+
peer:call(Pid5, persistent_term, put, [rabbit_ct_tcp_port_base, 32000]),
247250
[{[_], {ok, Results}}] = ct_master_fork:run("$1"),
251+
peer:stop(Pid5),
248252
peer:stop(Pid4),
249253
peer:stop(Pid3),
250254
peer:stop(Pid2),
@@ -258,7 +262,7 @@ endef
258262

259263
PARALLEL_CT_SET_1_A = unit_rabbit_ssl unit_cluster_formation_locking_mocks unit_cluster_formation_sort_nodes unit_collections unit_config_value_encryption unit_connection_tracking
260264
PARALLEL_CT_SET_1_B = amqp_address amqp_auth amqp_credit_api_v2 amqp_filter_prop amqp_filter_sql amqp_filter_sql_unit amqp_dotnet amqp_jms signal_handling single_active_consumer unit_access_control_authn_authz_context_propagation unit_access_control_credential_validation unit_amqp091_content_framing unit_amqp091_server_properties unit_app_management
261-
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack amqpl_direct_reply_to backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange rabbit_direct_reply_to_prop cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
265+
PARALLEL_CT_SET_1_C = amqp_proxy_protocol amqpl_consumer_ack backing_queue bindings rabbit_db_maintenance rabbit_db_msup rabbit_db_policy rabbit_db_queue rabbit_db_topic_exchange cluster_limit cluster_minority term_to_binary_compat_prop topic_permission transactions unicode unit_access_control
262266
PARALLEL_CT_SET_1_D = amqqueue_backward_compatibility channel_interceptor channel_operation_timeout classic_queue classic_queue_prop config_schema peer_discovery_dns peer_discovery_tmp_hidden_node per_node_limit per_user_connection_channel_limit
263267

264268
PARALLEL_CT_SET_2_A = cluster confirms_rejects consumer_timeout rabbit_access_control rabbit_confirms rabbit_core_metrics_gc rabbit_cuttlefish rabbit_db_binding rabbit_db_exchange
@@ -276,13 +280,16 @@ PARALLEL_CT_SET_4_B = per_user_connection_tracking per_vhost_connection_limit ra
276280
PARALLEL_CT_SET_4_C = msg_size_metrics unit_msg_size_metrics per_vhost_msg_store per_vhost_queue_limit priority_queue upgrade_preparation vhost
277281
PARALLEL_CT_SET_4_D = per_user_connection_channel_tracking product_info publisher_confirms_parallel queue_type rabbitmq_queues_cli_integration rabbitmqctl_integration rabbitmqctl_shutdown routing rabbit_amqqueue
278282

283+
PARALLEL_CT_SET_5_A = rabbit_direct_reply_to_prop direct_reply_to_amqpl direct_reply_to_amqp
284+
279285
PARALLEL_CT_SET_1 = $(sort $(PARALLEL_CT_SET_1_A) $(PARALLEL_CT_SET_1_B) $(PARALLEL_CT_SET_1_C) $(PARALLEL_CT_SET_1_D))
280286
PARALLEL_CT_SET_2 = $(sort $(PARALLEL_CT_SET_2_A) $(PARALLEL_CT_SET_2_B) $(PARALLEL_CT_SET_2_C) $(PARALLEL_CT_SET_2_D))
281287
PARALLEL_CT_SET_3 = $(sort $(PARALLEL_CT_SET_3_A) $(PARALLEL_CT_SET_3_B) $(PARALLEL_CT_SET_3_C) $(PARALLEL_CT_SET_3_D))
282288
PARALLEL_CT_SET_4 = $(sort $(PARALLEL_CT_SET_4_A) $(PARALLEL_CT_SET_4_B) $(PARALLEL_CT_SET_4_C) $(PARALLEL_CT_SET_4_D))
289+
PARALLEL_CT_SET_5 = $(PARALLEL_CT_SET_5_A)
283290

284291
SEQUENTIAL_CT_SUITES = amqp_client clustering_management dead_lettering feature_flags metadata_store_clustering quorum_queue rabbit_stream_queue rabbit_fifo_prop
285-
PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4)
292+
PARALLEL_CT_SUITES = $(PARALLEL_CT_SET_1) $(PARALLEL_CT_SET_2) $(PARALLEL_CT_SET_3) $(PARALLEL_CT_SET_4) $(PARALLEL_CT_SET_5)
286293

287294
ifeq ($(filter-out $(SEQUENTIAL_CT_SUITES) $(PARALLEL_CT_SUITES),$(CT_SUITES)),)
288295
parallel-ct-sanity-check:
@@ -308,16 +315,19 @@ define tpl_parallel_ct_test_spec
308315
{node, shard2, 'rabbit_shard2@localhost'}.
309316
{node, shard3, 'rabbit_shard3@localhost'}.
310317
{node, shard4, 'rabbit_shard4@localhost'}.
318+
{node, shard5, 'rabbit_shard5@localhost'}.
311319

312320
{define, 'Set1', [$(call comma_list,$(addsuffix _SUITE,$1))]}.
313321
{define, 'Set2', [$(call comma_list,$(addsuffix _SUITE,$2))]}.
314322
{define, 'Set3', [$(call comma_list,$(addsuffix _SUITE,$3))]}.
315323
{define, 'Set4', [$(call comma_list,$(addsuffix _SUITE,$4))]}.
324+
{define, 'Set5', [$(call comma_list,$(addsuffix _SUITE,$5))]}.
316325

317326
{suites, shard1, "test/", 'Set1'}.
318327
{suites, shard2, "test/", 'Set2'}.
319328
{suites, shard3, "test/", 'Set3'}.
320329
{suites, shard4, "test/", 'Set4'}.
330+
{suites, shard5, "test/", 'Set5'}.
321331
endef
322332

323333
define parallel_ct_set_target
@@ -330,7 +340,7 @@ parallel-ct-set-$(1): test-build
330340
$$(call erlang,$$(call ct_master.erl,ct.set-$(1).spec),-sname parallel_ct_$(PROJECT)@localhost -hidden -kernel net_ticktime 5)
331341
endef
332342

333-
$(foreach set,1 2 3 4,$(eval $(call parallel_ct_set_target,$(set))))
343+
$(foreach set,1 2 3 4 5,$(eval $(call parallel_ct_set_target,$(set))))
334344

335345
# --------------------------------------------------------------------
336346
# Compilation.

deps/rabbit/src/mc_amqpl.erl

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -228,7 +228,14 @@ convert_from(mc_amqp, Sections, Env) ->
228228
%% drop it, what else can we do?
229229
undefined
230230
end,
231-
231+
ReplyTo = case unwrap_shortstr(ReplyTo0) of
232+
<<"/queues/", Queue/binary>> ->
233+
try cow_uri:urldecode(Queue)
234+
catch error:_ -> undefined
235+
end;
236+
Other ->
237+
Other
238+
end,
232239
BP = #'P_basic'{message_id = MsgId091,
233240
delivery_mode = DelMode,
234241
expiration = Expiration,
@@ -237,7 +244,7 @@ convert_from(mc_amqp, Sections, Env) ->
237244
[] -> undefined;
238245
AllHeaders -> AllHeaders
239246
end,
240-
reply_to = unwrap_shortstr(ReplyTo0),
247+
reply_to = ReplyTo,
241248
type = Type,
242249
app_id = unwrap_shortstr(GroupId),
243250
priority = Priority,
@@ -349,7 +356,7 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) ->
349356
delivery_mode = DelMode,
350357
headers = Headers0,
351358
user_id = UserId,
352-
reply_to = ReplyTo,
359+
reply_to = ReplyTo0,
353360
type = Type,
354361
priority = Priority,
355362
app_id = AppId,
@@ -382,25 +389,32 @@ convert_to(mc_amqp, #content{payload_fragments_rev = PFR} = Content, Env) ->
382389
ttl = wrap(uint, Ttl),
383390
%% TODO: check Priority is a ubyte?
384391
priority = wrap(ubyte, Priority)},
392+
ReplyTo = case ReplyTo0 of
393+
undefined ->
394+
undefined;
395+
_ ->
396+
Queue = uri_string:quote(ReplyTo0),
397+
{utf8, <<"/queues/", Queue/binary>>}
398+
end,
385399
CorrId = case mc_util:urn_string_to_uuid(CorrId0) of
386400
{ok, CorrUUID} ->
387401
{uuid, CorrUUID};
388402
_ ->
389403
wrap(utf8, CorrId0)
390404
end,
391405
MsgId = case mc_util:urn_string_to_uuid(MsgId0) of
392-
{ok, MsgUUID} ->
393-
{uuid, MsgUUID};
394-
_ ->
395-
wrap(utf8, MsgId0)
396-
end,
406+
{ok, MsgUUID} ->
407+
{uuid, MsgUUID};
408+
_ ->
409+
wrap(utf8, MsgId0)
410+
end,
397411
P = case amqp10_section_header(?AMQP10_PROPERTIES_HEADER, Headers) of
398412
undefined ->
399413
#'v1_0.properties'{message_id = MsgId,
400414
user_id = wrap(binary, UserId),
401415
to = undefined,
402416
% subject = wrap(utf8, RKey),
403-
reply_to = wrap(utf8, ReplyTo),
417+
reply_to = ReplyTo,
404418
correlation_id = CorrId,
405419
content_type = wrap(symbol, ContentType),
406420
content_encoding = wrap(symbol, ContentEncoding),

deps/rabbit/src/pid_recomposition.erl

Lines changed: 0 additions & 60 deletions
This file was deleted.

deps/rabbit/src/rabbit.erl

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1157,7 +1157,6 @@ pg_local_amqp_connection() ->
11571157
pg_local_scope(Prefix) ->
11581158
list_to_atom(io_lib:format("~s_~s", [Prefix, node()])).
11591159

1160-
11611160
-spec update_cluster_tags() -> 'ok'.
11621161

11631162
update_cluster_tags() ->

deps/rabbit/src/rabbit_amqp_management.erl

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -87,8 +87,14 @@ handle_http_req(<<"GET">>,
8787
QName,
8888
fun(Q) ->
8989
{ok, NumMsgs, NumConsumers} = rabbit_amqqueue:stat(Q),
90-
RespPayload = encode_queue(Q, NumMsgs, NumConsumers),
91-
{ok, {<<"200">>, RespPayload, PermCaches}}
90+
case rabbit_volatile_queue:is(QNameBin) andalso
91+
not rabbit_volatile_queue:exists(QName) of
92+
true ->
93+
{error, not_found};
94+
false ->
95+
RespPayload = encode_queue(Q, NumMsgs, NumConsumers),
96+
{ok, {<<"200">>, RespPayload, PermCaches}}
97+
end
9298
end) of
9399
{ok, Result} ->
94100
Result;

0 commit comments

Comments
 (0)