Use stream leader to query offset

This commit is contained in:
Arnaud Cogoluègnes 2021-04-27 15:38:21 +02:00
parent 37275c4115
commit e3c4c9a471
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
1 changed files with 10 additions and 12 deletions

View File

@ -1806,7 +1806,7 @@ handle_frame_post_auth(Transport,
#stream_connection{socket = S,
virtual_host = VirtualHost,
user = User} =
Connection,
Connection0,
State,
<<?REQUEST:1,
?COMMAND_QUERY_OFFSET:15,
@ -1818,7 +1818,7 @@ handle_frame_post_auth(Transport,
Stream:StreamSize/binary>>,
Rest) ->
FrameSize = ?RESPONSE_FRAME_SIZE + 8,
{ResponseCode, Offset} =
{ResponseCode, Offset, Connection1} =
case rabbit_stream_utils:check_read_permitted(#resource{name = Stream,
kind = queue,
virtual_host =
@ -1826,24 +1826,22 @@ handle_frame_post_auth(Transport,
User, #{})
of
ok ->
case rabbit_stream_manager:lookup_local_member(VirtualHost,
Stream)
of
{error, not_found} ->
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0};
{ok, LocalMemberPid} ->
case lookup_leader(Stream, Connection0) of
cluster_not_found ->
{?RESPONSE_CODE_STREAM_DOES_NOT_EXIST, 0, Connection0};
{LeaderPid, C} ->
{?RESPONSE_CODE_OK,
case osiris:read_tracking(LocalMemberPid, Reference) of
case osiris:read_tracking(LeaderPid, Reference) of
undefined ->
0;
{offset, Offt} ->
Offt;
_ ->
0
end}
end, C}
end;
error ->
{?RESPONSE_CODE_ACCESS_REFUSED, 0}
{?RESPONSE_CODE_ACCESS_REFUSED, 0, Connection0}
end,
Transport:send(S,
[<<FrameSize:32,
@ -1853,7 +1851,7 @@ handle_frame_post_auth(Transport,
<<CorrelationId:32>>,
<<ResponseCode:16>>,
<<Offset:64>>]),
{Connection, State, Rest};
{Connection1, State, Rest};
handle_frame_post_auth(Transport,
#stream_connection{stream_subscriptions =
StreamSubscriptions} =