Return stream frame header binary in dispatch chunk callback
Peer Discovery AWS Integration Test / Integration Test (push) Has been cancelled Details

This saves a system call by sending the frame header and the chunk
header at the same time.

References rabbitmq/osiris#192
This commit is contained in:
Arnaud Cogoluègnes 2025-07-04 08:18:34 +00:00
parent d0c3b3a079
commit 2ed6136f9f
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
2 changed files with 10 additions and 18 deletions

View File

@ -3568,12 +3568,9 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
send_file_callback(?VERSION_1,
Transport,
_Log,
#consumer{configuration =
#consumer_configuration{socket = S,
subscription_id =
SubscriptionId,
#consumer_configuration{subscription_id = SubId,
counters = Counters}},
Counter) ->
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@ -3584,19 +3581,16 @@ send_file_callback(?VERSION_1,
?REQUEST:1,
?COMMAND_DELIVER:15,
?VERSION_1:16,
SubscriptionId:8/unsigned>>,
Transport:send(S, FrameBeginning),
SubId:8/unsigned>>,
atomics:add(Counter, 1, Size),
increase_messages_consumed(Counters, NumEntries),
set_consumer_offset(Counters, FirstOffsetInChunk)
set_consumer_offset(Counters, FirstOffsetInChunk),
FrameBeginning
end;
send_file_callback(?VERSION_2,
Transport,
Log,
#consumer{configuration =
#consumer_configuration{socket = S,
subscription_id =
SubscriptionId,
#consumer_configuration{subscription_id = SubId,
counters = Counters}},
Counter) ->
fun(#{chunk_id := FirstOffsetInChunk, num_entries := NumEntries},
@ -3608,12 +3602,12 @@ send_file_callback(?VERSION_2,
?REQUEST:1,
?COMMAND_DELIVER:15,
?VERSION_2:16,
SubscriptionId:8/unsigned,
SubId:8/unsigned,
CommittedChunkId:64>>,
Transport:send(S, FrameBeginning),
atomics:add(Counter, 1, Size),
increase_messages_consumed(Counters, NumEntries),
set_consumer_offset(Counters, FirstOffsetInChunk)
set_consumer_offset(Counters, FirstOffsetInChunk),
FrameBeginning
end.
send_chunks(DeliverVersion,
@ -3683,9 +3677,7 @@ send_chunks(DeliverVersion,
Retry,
Counter) ->
case osiris_log:send_file(Socket, Log,
send_file_callback(DeliverVersion,
Transport,
Log,
send_file_callback(DeliverVersion, Log,
Consumer,
Counter))
of

View File

@ -49,7 +49,7 @@ dep_jose = hex 1.11.10
dep_khepri = hex 0.17.1
dep_khepri_mnesia_migration = hex 0.8.0
dep_meck = hex 1.0.0
dep_osiris = git https://github.com/rabbitmq/osiris v1.8.8
dep_osiris = git https://github.com/rabbitmq/osiris send-file-improvements
dep_prometheus = hex 4.11.0
dep_ra = hex 2.16.12
dep_ranch = hex 2.2.0