Merge pull request #14438 from rabbitmq/stream-queue-leader-fix

Fix issue where leader is not returned after stream declaration
This commit is contained in:
David Ansari 2025-09-01 15:32:22 +02:00 committed by GitHub
commit def157a105
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
2 changed files with 16 additions and 15 deletions

View File

@ -196,12 +196,12 @@ create_stream(Q0) ->
replica_nodes => Followers}), replica_nodes => Followers}),
Q1 = amqqueue:set_type_state(Q0, Conf), Q1 = amqqueue:set_type_state(Q0, Conf),
case rabbit_amqqueue:internal_declare(Q1, false) of case rabbit_amqqueue:internal_declare(Q1, false) of
{created, Q} -> {created, Q2} ->
case rabbit_stream_coordinator:new_stream(Q, Leader) of case rabbit_stream_coordinator:new_stream(Q2, Leader) of
{ok, {ok, LeaderPid}, _} -> {ok, {ok, LeaderPid}, _} ->
%% update record with leader pid %% update record with leader pid
case set_leader_pid(LeaderPid, amqqueue:get_name(Q)) of case set_leader_pid(LeaderPid, QName) of
ok -> {ok, Q} ->
rabbit_event:notify(queue_created, rabbit_event:notify(queue_created,
[{name, QName}, [{name, QName},
{durable, true}, {durable, true},
@ -218,7 +218,7 @@ create_stream(Q0) ->
[rabbit_misc:rs(QName), node()]} [rabbit_misc:rs(QName), node()]}
end; end;
Error -> Error ->
_ = rabbit_amqqueue:internal_delete(Q, ActingUser), _ = rabbit_amqqueue:internal_delete(Q2, ActingUser),
{protocol_error, internal_error, "Cannot declare ~ts on node '~ts': ~255p", {protocol_error, internal_error, "Cannot declare ~ts on node '~ts': ~255p",
[rabbit_misc:rs(QName), node(), Error]} [rabbit_misc:rs(QName), node(), Error]}
end; end;
@ -1396,11 +1396,6 @@ resend_all(#stream_client{leader = LeaderPid,
end || {Seq, {_Corr, Msg}} <- Msgs], end || {Seq, {_Corr, Msg}} <- Msgs],
State. State.
-spec set_leader_pid(Pid, QName) -> Ret when
Pid :: pid(),
QName :: rabbit_amqqueue:name(),
Ret :: ok | {error, timeout}.
set_leader_pid(Pid, QName) -> set_leader_pid(Pid, QName) ->
%% TODO this should probably be a single khepri transaction for better performance. %% TODO this should probably be a single khepri transaction for better performance.
Fun = fun (Q) -> Fun = fun (Q) ->
@ -1409,10 +1404,16 @@ set_leader_pid(Pid, QName) ->
case rabbit_amqqueue:update(QName, Fun) of case rabbit_amqqueue:update(QName, Fun) of
not_found -> not_found ->
%% This can happen during recovery %% This can happen during recovery
{ok, Q} = rabbit_amqqueue:lookup_durable_queue(QName), {ok, Q1} = rabbit_amqqueue:lookup_durable_queue(QName),
rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)); Q = Fun(Q1),
_ -> case rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Q) of
ok ok ->
{ok, Q};
Err ->
Err
end;
Q ->
{ok, Q}
end. end.
close_log(undefined) -> ok; close_log(undefined) -> ok;

View File

@ -1227,7 +1227,7 @@ roundtrip_with_drain(Config, QueueType, QName)
Address = rabbitmq_amqp_address:queue(QName), Address = rabbitmq_amqp_address:queue(QName),
{Connection, Session, LinkPair} = init(Config), {Connection, Session, LinkPair} = init(Config),
QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QueueType}}}, QProps = #{arguments => #{<<"x-queue-type">> => {utf8, QueueType}}},
{ok, _} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps), {ok, #{leader := _}} = rabbitmq_amqp_client:declare_queue(LinkPair, QName, QProps),
{ok, Sender} = amqp10_client:attach_sender_link( {ok, Sender} = amqp10_client:attach_sender_link(
Session, <<"test-sender">>, Address), Session, <<"test-sender">>, Address),
wait_for_credit(Sender), wait_for_credit(Sender),