Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions deps/rabbit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ define PROJECT_ENV
{dead_letter_worker_consumer_prefetch, 32},
{dead_letter_worker_publisher_confirm_timeout, 180000},
{vhost_process_reconciliation_run_interval, 30},
{stream_read_ahead, true},
%% for testing
{vhost_process_reconciliation_enabled, true},
{license_line, "Licensed under the MPL 2.0. Website: https://rabbitmq.com"}
Expand Down
3 changes: 3 additions & 0 deletions deps/rabbit/priv/schema/rabbit.schema
Original file line number Diff line number Diff line change
Expand Up @@ -2785,6 +2785,9 @@ fun(Conf) ->
end
end}.

{mapping, "stream.read_ahead", "rabbit.stream_read_ahead",
[{datatype, {enum, [true, false]}}]}.

{mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [
{datatype, [binary]}
]}.
Expand Down
23 changes: 14 additions & 9 deletions deps/rabbit/src/rabbit_stream_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
-export([format_osiris_event/2]).
-export([update_stream_conf/2]).
-export([readers/1]).
-export([read_ahead_on/0]).

-export([parse_offset_arg/1,
filter_spec/1]).
Expand Down Expand Up @@ -463,10 +464,11 @@ query_local_pid(#stream_client{stream_id = StreamId} = State) ->
begin_stream(#stream_client{name = QName,
readers = Readers0,
local_pid = LocalPid} = State,
Tag, Offset, Mode, AckRequired, Filter, Options)
Tag, Offset, Mode, AckRequired, Filter, Options0)
when is_pid(LocalPid) ->
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options),
Options1 = Options0#{read_ahead => read_ahead_on()},
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1),
NextOffset = osiris_log:next_offset(Seg0) - 1,
osiris:register_offset_listener(LocalPid, NextOffset),
StartOffset = case Offset of
Expand All @@ -491,7 +493,7 @@ begin_stream(#stream_client{name = QName,
last_consumed_offset = StartOffset,
log = Seg0,
filter = Filter,
reader_options = Options},
reader_options = Options1},
{ok, State#stream_client{readers = Readers0#{Tag => Str0}}}.

cancel(_Q, #{consumer_tag := ConsumerTag,
Expand Down Expand Up @@ -659,8 +661,8 @@ handle_event(_QName, {stream_local_member_change, Pid},
osiris_log:close(Log0),
CounterSpec = {{?MODULE, QName, self()}, []},
?LOG_DEBUG("Re-creating Osiris reader for consumer ~tp at offset ~tp "
" with options ~tp",
[T, Offset, Options]),
" with options ~tp",
[T, Offset, Options]),
{ok, Log1} = osiris:init_reader(Pid, Offset, CounterSpec, Options),
NextOffset = osiris_log:next_offset(Log1) - 1,
?LOG_DEBUG("Registering offset listener at offset ~tp", [NextOffset]),
Expand Down Expand Up @@ -1176,7 +1178,7 @@ stream_entries(QName, Name, CTag, LocalPid,
credit = Credit} = Str0) ->
case Credit > 0 of
true ->
case chunk_iterator(Str0, LocalPid) of
case chunk_iterator(Str0, LocalPid, undefined) of
{ok, Str} ->
stream_entries(QName, Name, CTag, LocalPid, Str);
{end_of_stream, Str} ->
Expand Down Expand Up @@ -1229,7 +1231,7 @@ stream_entries(QName, Name, CTag, LocalPid,
gen_server:cast(self(), queue_event(QName, {resume_filtering, CTag})),
{Str0#stream{filtering_paused = true}, lists:reverse(Acc0)};
end_of_chunk ->
case chunk_iterator(Str0, LocalPid) of
case chunk_iterator(Str0, LocalPid, Iter0) of
{ok, Str} ->
stream_entries(QName, Name, CTag, LocalPid, Str, Acc0);
{end_of_stream, Str} ->
Expand Down Expand Up @@ -1294,8 +1296,8 @@ stream_entries(QName, Name, CTag, LocalPid,

chunk_iterator(#stream{credit = Credit,
listening_offset = LOffs,
log = Log0} = Str0, LocalPid) ->
case osiris_log:chunk_iterator(Log0, Credit) of
log = Log0} = Str0, LocalPid, PrevIterator) ->
case osiris_log:chunk_iterator(Log0, Credit, PrevIterator) of
{ok, _ChunkHeader, Iter, Log} ->
{ok, Str0#stream{chunk_iterator = Iter,
log = Log}};
Expand Down Expand Up @@ -1527,3 +1529,6 @@ queue_vm_stats_sups() ->
queue_vm_ets() ->
{[],
[]}.

read_ahead_on() ->
application:get_env(rabbit, stream_read_ahead, true).
22 changes: 22 additions & 0 deletions deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets
Original file line number Diff line number Diff line change
Expand Up @@ -1223,6 +1223,28 @@ credential_validator.regexp = ^abc\\d+",
[{osiris, [
{port_range, {4100, 4600}}
]}],
[]},

%%
%% Stream read ahead on/off
%%

{stream_read_ahead,
"
stream.read_ahead = true
",
[{rabbit, [
{stream_read_ahead, true}
]}],
[]},

{stream_read_ahead,
"
stream.read_ahead = false
",
[{rabbit, [
{stream_read_ahead, false}
]}],
[]}

].
39 changes: 17 additions & 22 deletions deps/rabbitmq_stream/src/rabbit_stream_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -2812,11 +2812,14 @@ init_reader(ConnectionTransport,
Properties,
OffsetSpec) ->
CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []},
Options = maps:merge(#{transport => ConnectionTransport,
chunk_selector => get_chunk_selector(Properties)},
rabbit_stream_utils:filter_spec(Properties)),
{ok, Segment} =
osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec, Options),
Options0 = #{transport => ConnectionTransport,
chunk_selector => get_chunk_selector(Properties),
read_ahead => rabbit_stream_queue:read_ahead_on()},

Options1 = maps:merge(Options0,
rabbit_stream_utils:filter_spec(Properties)),
{ok, Segment} = osiris:init_reader(LocalMemberPid, OffsetSpec,
CounterSpec, Options1),
?LOG_DEBUG("Next offset for subscription ~tp is ~tp",
[SubscriptionId, osiris_log:next_offset(Segment)]),
Segment.
Expand Down Expand Up @@ -3571,12 +3574,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).

send_file_callback(?VERSION_1,
Transport,
_Log,
#consumer{configuration =
#consumer_configuration{socket = S,
subscription_id =
SubscriptionId,
#consumer_configuration{subscription_id = SubId,
counters = Counters}},
Counter) ->
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
Expand All @@ -3587,19 +3587,16 @@ send_file_callback(?VERSION_1,
?REQUEST:1,
?COMMAND_DELIVER:15,
?VERSION_1:16,
SubscriptionId:8/unsigned>>,
Transport:send(S, FrameBeginning),
SubId:8/unsigned>>,
atomics:add(Counter, 1, Size),
increase_messages_consumed(Counters, NumEntries),
set_consumer_offset(Counters, FirstOffsetInChunk)
set_consumer_offset(Counters, FirstOffsetInChunk),
FrameBeginning
end;
send_file_callback(?VERSION_2,
Transport,
Log,
#consumer{configuration =
#consumer_configuration{socket = S,
subscription_id =
SubscriptionId,
#consumer_configuration{subscription_id = SubId,
counters = Counters}},
Counter) ->
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
Expand All @@ -3611,12 +3608,12 @@ send_file_callback(?VERSION_2,
?REQUEST:1,
?COMMAND_DELIVER:15,
?VERSION_2:16,
SubscriptionId:8/unsigned,
SubId:8/unsigned,
CommittedChunkId:64>>,
Transport:send(S, FrameBeginning),
atomics:add(Counter, 1, Size),
increase_messages_consumed(Counters, NumEntries),
set_consumer_offset(Counters, FirstOffsetInChunk)
set_consumer_offset(Counters, FirstOffsetInChunk),
FrameBeginning
end.

send_chunks(DeliverVersion,
Expand Down Expand Up @@ -3686,9 +3683,7 @@ send_chunks(DeliverVersion,
Retry,
Counter) ->
case osiris_log:send_file(Socket, Log,
send_file_callback(DeliverVersion,
Transport,
Log,
send_file_callback(DeliverVersion, Log,
Consumer,
Counter))
of
Expand Down
7 changes: 7 additions & 0 deletions deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").

Expand Down Expand Up @@ -1773,3 +1774,9 @@ request(CorrId, Cmd) ->

rand_bin() ->
base64:encode(rand:bytes(20)).

generate_log(MsgSize, MsgsPerChunk, NumMessages, Directory) ->
Body = binary:copy(<<"a">>, MsgSize),
Data = #'v1_0.data'{content = Body},
Bin = amqp10_framing:encode_bin(Data),
osiris_log:generate_log(Bin, MsgsPerChunk, NumMessages, Directory).
2 changes: 1 addition & 1 deletion rabbitmq-components.mk
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ dep_jose = hex 1.11.10
dep_khepri = hex 0.17.2
dep_khepri_mnesia_migration = hex 0.8.0
dep_meck = hex 1.0.0
dep_osiris = git https://github.com/rabbitmq/osiris v1.9.0
dep_osiris = git https://github.com/rabbitmq/osiris send-file-improvements
dep_prometheus = hex 5.1.1
dep_ra = hex 2.17.1
dep_ranch = hex 2.2.0
Expand Down
Loading