Adapt stream code to Osiris read ahead

Osiris can read ahead data in case of small chunks. This saves system
calls and increases consumption rate dramatically for some streams.
This is transparent for the stream protocol, but requires a small tweak
for the stream queue type implementation (passing in the previous
iterator when creating a new one).

The read ahead is on by default but can be deactivated with to the new
stream.read_ahead configuration entry (true / false).

Co-authored-by: Karl Nilsson <kjnilsson@gmail.com>

References rabbitmq/osiris#192
This commit is contained in:
Arnaud Cogoluègnes 2025-09-09 08:02:34 +00:00 committed by Michael Klishin
parent 91efe616b8
commit 6238dac1c3
No known key found for this signature in database
GPG Key ID: 16AB14D00D613900
6 changed files with 55 additions and 14 deletions

View File

@ -119,6 +119,7 @@ define PROJECT_ENV
{dead_letter_worker_consumer_prefetch, 32},
{dead_letter_worker_publisher_confirm_timeout, 180000},
{vhost_process_reconciliation_run_interval, 30},
{stream_read_ahead, true},
%% for testing
{vhost_process_reconciliation_enabled, true},
{license_line, "Licensed under the MPL 2.0. Website: https://rabbitmq.com"}

View File

@ -2785,6 +2785,9 @@ fun(Conf) ->
end
end}.
{mapping, "stream.read_ahead", "rabbit.stream_read_ahead",
[{datatype, {enum, [true, false]}}]}.
{mapping, "cluster_tags.$tag", "rabbit.cluster_tags", [
{datatype, [binary]}
]}.

View File

@ -51,6 +51,7 @@
-export([format_osiris_event/2]).
-export([update_stream_conf/2]).
-export([readers/1]).
-export([read_ahead_on/0]).
-export([parse_offset_arg/1,
filter_spec/1]).
@ -463,10 +464,11 @@ query_local_pid(#stream_client{stream_id = StreamId} = State) ->
begin_stream(#stream_client{name = QName,
readers = Readers0,
local_pid = LocalPid} = State,
Tag, Offset, Mode, AckRequired, Filter, Options)
Tag, Offset, Mode, AckRequired, Filter, Options0)
when is_pid(LocalPid) ->
CounterSpec = {{?MODULE, QName, Tag, self()}, []},
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options),
Options1 = Options0#{read_ahead => read_ahead_on()},
{ok, Seg0} = osiris:init_reader(LocalPid, Offset, CounterSpec, Options1),
NextOffset = osiris_log:next_offset(Seg0) - 1,
osiris:register_offset_listener(LocalPid, NextOffset),
StartOffset = case Offset of
@ -491,7 +493,7 @@ begin_stream(#stream_client{name = QName,
last_consumed_offset = StartOffset,
log = Seg0,
filter = Filter,
reader_options = Options},
reader_options = Options1},
{ok, State#stream_client{readers = Readers0#{Tag => Str0}}}.
cancel(_Q, #{consumer_tag := ConsumerTag,
@ -659,8 +661,8 @@ handle_event(_QName, {stream_local_member_change, Pid},
osiris_log:close(Log0),
CounterSpec = {{?MODULE, QName, self()}, []},
?LOG_DEBUG("Re-creating Osiris reader for consumer ~tp at offset ~tp "
" with options ~tp",
[T, Offset, Options]),
" with options ~tp",
[T, Offset, Options]),
{ok, Log1} = osiris:init_reader(Pid, Offset, CounterSpec, Options),
NextOffset = osiris_log:next_offset(Log1) - 1,
?LOG_DEBUG("Registering offset listener at offset ~tp", [NextOffset]),
@ -1176,7 +1178,7 @@ stream_entries(QName, Name, CTag, LocalPid,
credit = Credit} = Str0) ->
case Credit > 0 of
true ->
case chunk_iterator(Str0, LocalPid) of
case chunk_iterator(Str0, LocalPid, undefined) of
{ok, Str} ->
stream_entries(QName, Name, CTag, LocalPid, Str);
{end_of_stream, Str} ->
@ -1229,7 +1231,7 @@ stream_entries(QName, Name, CTag, LocalPid,
gen_server:cast(self(), queue_event(QName, {resume_filtering, CTag})),
{Str0#stream{filtering_paused = true}, lists:reverse(Acc0)};
end_of_chunk ->
case chunk_iterator(Str0, LocalPid) of
case chunk_iterator(Str0, LocalPid, Iter0) of
{ok, Str} ->
stream_entries(QName, Name, CTag, LocalPid, Str, Acc0);
{end_of_stream, Str} ->
@ -1294,8 +1296,8 @@ stream_entries(QName, Name, CTag, LocalPid,
chunk_iterator(#stream{credit = Credit,
listening_offset = LOffs,
log = Log0} = Str0, LocalPid) ->
case osiris_log:chunk_iterator(Log0, Credit) of
log = Log0} = Str0, LocalPid, PrevIterator) ->
case osiris_log:chunk_iterator(Log0, Credit, PrevIterator) of
{ok, _ChunkHeader, Iter, Log} ->
{ok, Str0#stream{chunk_iterator = Iter,
log = Log}};
@ -1527,3 +1529,6 @@ queue_vm_stats_sups() ->
queue_vm_ets() ->
{[],
[]}.
read_ahead_on() ->
application:get_env(rabbit, stream_read_ahead, true).

View File

@ -1223,6 +1223,28 @@ credential_validator.regexp = ^abc\\d+",
[{osiris, [
{port_range, {4100, 4600}}
]}],
[]},
%%
%% Stream read ahead on/off
%%
{stream_read_ahead,
"
stream.read_ahead = true
",
[{rabbit, [
{stream_read_ahead, true}
]}],
[]},
{stream_read_ahead,
"
stream.read_ahead = false
",
[{rabbit, [
{stream_read_ahead, false}
]}],
[]}
].

View File

@ -2812,11 +2812,14 @@ init_reader(ConnectionTransport,
Properties,
OffsetSpec) ->
CounterSpec = {{?MODULE, QueueResource, SubscriptionId, self()}, []},
Options = maps:merge(#{transport => ConnectionTransport,
chunk_selector => get_chunk_selector(Properties)},
rabbit_stream_utils:filter_spec(Properties)),
{ok, Segment} =
osiris:init_reader(LocalMemberPid, OffsetSpec, CounterSpec, Options),
Options0 = #{transport => ConnectionTransport,
chunk_selector => get_chunk_selector(Properties),
read_ahead => rabbit_stream_queue:read_ahead_on()},
Options1 = maps:merge(Options0,
rabbit_stream_utils:filter_spec(Properties)),
{ok, Segment} = osiris:init_reader(LocalMemberPid, OffsetSpec,
CounterSpec, Options1),
?LOG_DEBUG("Next offset for subscription ~tp is ~tp",
[SubscriptionId, osiris_log:next_offset(Segment)]),
Segment.

View File

@ -19,6 +19,7 @@
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_framing.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl").
@ -1773,3 +1774,9 @@ request(CorrId, Cmd) ->
rand_bin() ->
base64:encode(rand:bytes(20)).
generate_log(MsgSize, MsgsPerChunk, NumMessages, Directory) ->
Body = binary:copy(<<"a">>, MsgSize),
Data = #'v1_0.data'{content = Body},
Bin = amqp10_framing:encode_bin(Data),
osiris_log:generate_log(Bin, MsgsPerChunk, NumMessages, Directory).