Skip to content

Commit 71c4773

Browse files
committed
Add stream.read_ahead configuration entry
This is a switch for the read ahead optimization in Osiris.
1 parent af7b93c commit 71c4773

File tree

5 files changed

+44
-10
lines changed

5 files changed

+44
-10
lines changed

deps/rabbit/Makefile

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,6 +119,7 @@ define PROJECT_ENV
119119
{dead_letter_worker_consumer_prefetch, 32},
120120
{dead_letter_worker_publisher_confirm_timeout, 180000},
121121
{vhost_process_reconciliation_run_interval, 30},
122+
{stream_read_ahead, true},
122123
%% for testing
123124
{vhost_process_reconciliation_enabled, true},
124125
{license_line, "Licensed under the MPL 2.0. Website: https://rabbitmq.com"}

deps/rabbit/priv/schema/rabbit.schema

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2785,6 +2785,9 @@ fun(Conf) ->
27852785
end
27862786
end}.
27872787

2788+
{mapping, "stream.read_ahead", "rabbit.stream_read_ahead",
2789+
[{datatype, {enum, [true, false]}}]}.
2790+
27882791
{mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [
27892792
{datatype, [binary]}
27902793
]}.

deps/rabbit/src/rabbit_stream_queue.erl

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
-export([format_osiris_event/2]).
5252
-export([update_stream_conf/2]).
5353
-export([readers/1]).
54+
-export([read_ahead_on/0]).
5455

5556
-export([parse_offset_arg/1,
5657
filter_spec/1]).
@@ -463,10 +464,11 @@ query_local_pid(#stream_client{stream_id = StreamId} = State) ->
463464
begin_stream(#stream_client{name = QName,
464465
readers = Readers0,
465466
local_pid = LocalPid} = State,
466-
Tag, Offset, Mode, AckRequired, Filter, Options)
467+
Tag, Offset, Mode, AckRequired, Filter, Options0)
467468
when is_pid(LocalPid) ->
468469
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
469-
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options),
470+
Options1 = Options0#{read_ahead => read_ahead_on()},
471+
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1),
470472
NextOffset = osiris_log:next_offset(Seg0) - 1,
471473
osiris:register_offset_listener(LocalPid, NextOffset),
472474
StartOffset = case Offset of
@@ -491,7 +493,7 @@ begin_stream(#stream_client{name = QName,
491493
last_consumed_offset = StartOffset,
492494
log = Seg0,
493495
filter = Filter,
494-
reader_options = Options},
496+
reader_options = Options1},
495497
{ok, State#stream_client{readers = Readers0#{Tag => Str0}}}.
496498

497499
cancel(_Q, #{consumer_tag := ConsumerTag,
@@ -659,8 +661,8 @@ handle_event(_QName, {stream_local_member_change, Pid},
659661
osiris_log:close(Log0),
660662
CounterSpec = {{?MODULE, QName, self()}, []},
661663
?LOG_DEBUG("Re-creating Osiris reader for consumer ~tp at offset ~tp "
662-
" with options ~tp",
663-
[T, Offset, Options]),
664+
" with options ~tp",
665+
[T, Offset, Options]),
664666
{ok, Log1} = osiris:init_reader(Pid, Offset, CounterSpec, Options),
665667
NextOffset = osiris_log:next_offset(Log1) - 1,
666668
?LOG_DEBUG("Registering offset listener at offset ~tp", [NextOffset]),
@@ -1527,3 +1529,6 @@ queue_vm_stats_sups() ->
15271529
queue_vm_ets() ->
15281530
{[],
15291531
[]}.
1532+
1533+
read_ahead_on() ->
1534+
application:get_env(rabbit, stream_read_ahead, true).

deps/rabbit/test/config_schema_SUITE_data/rabbit.snippets

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1223,6 +1223,28 @@ credential_validator.regexp = ^abc\\d+",
12231223
[{osiris, [
12241224
{port_range, {4100, 4600}}
12251225
]}],
1226+
[]},
1227+
1228+
%%
1229+
%% Stream read ahead on/off
1230+
%%
1231+
1232+
{stream_read_ahead,
1233+
"
1234+
stream.read_ahead = true
1235+
",
1236+
[{rabbit, [
1237+
{stream_read_ahead, true}
1238+
]}],
1239+
[]},
1240+
1241+
{stream_read_ahead,
1242+
"
1243+
stream.read_ahead = false
1244+
",
1245+
[{rabbit, [
1246+
{stream_read_ahead, false}
1247+
]}],
12261248
[]}
12271249

12281250
].

deps/rabbitmq_stream/src/rabbit_stream_reader.erl

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2812,11 +2812,14 @@ init_reader(ConnectionTransport,
28122812
Properties,
28132813
OffsetSpec) ->
28142814
CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []},
2815-
Options = maps:merge(#{transport => ConnectionTransport,
2816-
chunk_selector => get_chunk_selector(Properties)},
2817-
rabbit_stream_utils:filter_spec(Properties)),
2818-
{ok, Segment} =
2819-
osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec, Options),
2815+
Options0 = #{transport => ConnectionTransport,
2816+
chunk_selector => get_chunk_selector(Properties),
2817+
read_ahead => rabbit_stream_queue:read_ahead_on()},
2818+
2819+
Options1 = maps:merge(Options0,
2820+
rabbit_stream_utils:filter_spec(Properties)),
2821+
{ok, Segment} = osiris:init_reader(LocalMemberPid, OffsetSpec,
2822+
CounterSpec, Options1),
28202823
?LOG_DEBUG("Next offset for subscription ~tp is ~tp",
28212824
[SubscriptionId, osiris_log:next_offset(Segment)]),
28222825
Segment.

0 commit comments

Comments
 (0)