Skip to content

Commit 6bb649a

Browse files
committed
Local shovels: single acks
For some reason, multiple acknowledgments are really slow when using credit flow v2
1 parent 382fac3 commit 6bb649a

File tree

1 file changed

+15
-20
lines changed

1 file changed

+15
-20
lines changed

deps/rabbitmq_shovel/src/rabbit_local_shovel.erl

Lines changed: 15 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,6 @@
5959
msg_id
6060
}).
6161

62-
%% This is a significantly reduced version of its rabbit_amqp_session counterpart.
63-
%% Local shovels always use the maximum credit allowed.
64-
-record(credit_req, {
65-
delivery_count :: sequence_no()
66-
}).
67-
6862
parse(_Name, {source, Source}) ->
6963
Queue = parse_parameter(queue, fun parse_binary/1,
7064
proplists:get_value(queue, Source)),
@@ -574,21 +568,23 @@ get_user_vhost_from_amqp_param(Uri) ->
574568

575569
settle(Op, DeliveryTag, Multiple,
576570
#{source := #{queue_r := QRef,
577-
current := Current = #{queue_states := QState0,
578-
consumer_tag := CTag,
571+
current := Current = #{consumer_tag := CTag,
579572
unacked_message_q := UAMQ0}
580573
} = Src} = State0) ->
581574
{MsgIds, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
582-
case rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QState0) of
583-
{ok, QState1, Actions} ->
584-
State = State0#{source => Src#{current => Current#{queue_states => QState1,
585-
unacked_message_q => UAMQ}}},
586-
handle_queue_actions(Actions, State);
587-
{'protocol_error', Type, Reason, Args} ->
588-
?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",
589-
[Op, Type, io_lib:format(Reason, Args)]),
590-
exit({shutdown, {ack_failed, Reason}})
591-
end.
575+
State = State0#{source => Src#{current => Current#{unacked_message_q => UAMQ}}},
576+
lists:foldl(
577+
fun(MsgId, #{source := Src0 = #{current := Current0 = #{queue_states := QState0}}} = St0) ->
578+
case rabbit_queue_type:settle(QRef, Op, CTag, [MsgId], QState0) of
579+
{ok, QState1, Actions} ->
580+
St = St0#{source => Src0#{current => Current0#{queue_states => QState1}}},
581+
handle_queue_actions(Actions, St);
582+
{'protocol_error', Type, Reason, Args} ->
583+
?LOG_ERROR("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",
584+
[Op, Type, io_lib:format(Reason, Args)]),
585+
exit({shutdown, {ack_failed, Reason}})
586+
end
587+
end, State, MsgIds).
592588

593589
%% From rabbit_channel
594590
%% Records a client-sent acknowledgement. Handles both single delivery acks
@@ -661,8 +657,7 @@ maybe_grant_credit(#{source := #{queue_r := QName,
661657
current := #{consumer_tag := CTag,
662658
queue_states := QState0,
663659
unacked_message_q := Q} = Current
664-
} = Src,
665-
dest := #{unacked := Unacked}} = State0) ->
660+
} = Src} = State0) ->
666661
GrantLinkCredit = grant_link_credit(Credit, MaxLinkCredit, ?QUEUE:len(Q)),
667662
{ok, QState, Actions} = case (GrantLinkCredit and not HaveCreditReqInFlight) of
668663
true ->

0 commit comments

Comments
 (0)