Ra 2.16.5 - bug fixes and minor improvements
Ra improvements: * Don't allow a non-voter to start elections * Register with ra directory before initialising ra server. * Trigger tick_timeout immediately after entering leader state. * Set a configurable segment max size This commit also includes a change to turn the quorum queue become leader callback to become a noop and instead rely on the more promptly tick_handler to handle the meta data store update after a leader election. This more prompt tick update means there should be a much shorter gap between the queue metrics being deleted from the old leader node to them being available again on the new node resulting in smoother message count metrics. Fix test that relied on waiting on too simplistic a property before asserting.
This commit is contained in:
parent
44657cd393
commit
4fe96dfd27
|
@ -425,11 +425,10 @@ local_or_remote_handler(ChPid, Module, Function, Args) ->
|
||||||
erpc:cast(Node, Module, Function, Args)
|
erpc:cast(Node, Module, Function, Args)
|
||||||
end.
|
end.
|
||||||
|
|
||||||
become_leader(QName, Name) ->
|
become_leader(_QName, _Name) ->
|
||||||
%% as this function is called synchronously when a ra node becomes leader
|
%% noop now as we instead rely on the promt tick_timeout + repair to update
|
||||||
%% we need to ensure there is no chance of blocking as else the ra node
|
%% the meta data store after a leader change
|
||||||
%% may not be able to establish its leadership
|
ok.
|
||||||
spawn(fun () -> become_leader0(QName, Name) end).
|
|
||||||
|
|
||||||
become_leader0(QName, Name) ->
|
become_leader0(QName, Name) ->
|
||||||
Fun = fun (Q1) ->
|
Fun = fun (Q1) ->
|
||||||
|
@ -580,7 +579,6 @@ handle_tick(QName,
|
||||||
Nodes) ->
|
Nodes) ->
|
||||||
%% this makes calls to remote processes so cannot be run inside the
|
%% this makes calls to remote processes so cannot be run inside the
|
||||||
%% ra server
|
%% ra server
|
||||||
Self = self(),
|
|
||||||
spawn(
|
spawn(
|
||||||
fun() ->
|
fun() ->
|
||||||
try
|
try
|
||||||
|
@ -638,7 +636,7 @@ handle_tick(QName,
|
||||||
end}
|
end}
|
||||||
| Infos0],
|
| Infos0],
|
||||||
rabbit_core_metrics:queue_stats(QName, Infos),
|
rabbit_core_metrics:queue_stats(QName, Infos),
|
||||||
ok = repair_leader_record(Q, Self),
|
ok = repair_leader_record(Q, Name),
|
||||||
case repair_amqqueue_nodes(Q) of
|
case repair_amqqueue_nodes(Q) of
|
||||||
ok ->
|
ok ->
|
||||||
ok;
|
ok;
|
||||||
|
@ -675,7 +673,7 @@ handle_tick(QName, Config, _Nodes) ->
|
||||||
rabbit_log:debug("~ts: handle tick received unexpected config format ~tp",
|
rabbit_log:debug("~ts: handle tick received unexpected config format ~tp",
|
||||||
[rabbit_misc:rs(QName), Config]).
|
[rabbit_misc:rs(QName), Config]).
|
||||||
|
|
||||||
repair_leader_record(Q, Self) ->
|
repair_leader_record(Q, Name) ->
|
||||||
Node = node(),
|
Node = node(),
|
||||||
case amqqueue:get_pid(Q) of
|
case amqqueue:get_pid(Q) of
|
||||||
{_, Node} ->
|
{_, Node} ->
|
||||||
|
@ -683,9 +681,8 @@ repair_leader_record(Q, Self) ->
|
||||||
ok;
|
ok;
|
||||||
_ ->
|
_ ->
|
||||||
QName = amqqueue:get_name(Q),
|
QName = amqqueue:get_name(Q),
|
||||||
rabbit_log:debug("~ts: repairing leader record",
|
rabbit_log:debug("~ts: updating leader record to current node ~b",
|
||||||
[rabbit_misc:rs(QName)]),
|
[rabbit_misc:rs(QName), Node]),
|
||||||
{_, Name} = erlang:process_info(Self, registered_name),
|
|
||||||
ok = become_leader0(QName, Name),
|
ok = become_leader0(QName, Name),
|
||||||
ok
|
ok
|
||||||
end,
|
end,
|
||||||
|
|
|
@ -482,53 +482,50 @@ queues_enable_totals_test(Config) ->
|
||||||
Publish(<<"foo">>),
|
Publish(<<"foo">>),
|
||||||
|
|
||||||
Fun = fun() ->
|
Fun = fun() ->
|
||||||
length(rabbit_ct_broker_helpers:rpc(Config, 0, ets, tab2list,
|
Queues = http_get(Config, "/queues/%2F"),
|
||||||
[queue_coarse_metrics])) == 2
|
Queue = http_get(Config, "/queues/%2F/foo"),
|
||||||
|
|
||||||
|
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
||||||
|
NodeBin = atom_to_binary(Node, utf8),
|
||||||
|
assert_list([#{name => <<"baz">>,
|
||||||
|
vhost => <<"/">>,
|
||||||
|
durable => true,
|
||||||
|
auto_delete => false,
|
||||||
|
exclusive => false,
|
||||||
|
arguments => #{'x-queue-type' => <<"classic">>},
|
||||||
|
node => NodeBin,
|
||||||
|
messages => 1,
|
||||||
|
messages_ready => 1,
|
||||||
|
messages_unacknowledged => 0},
|
||||||
|
#{name => <<"foo">>,
|
||||||
|
vhost => <<"/">>,
|
||||||
|
durable => true,
|
||||||
|
auto_delete => false,
|
||||||
|
exclusive => null,
|
||||||
|
arguments => #{'x-queue-type' => <<"quorum">>},
|
||||||
|
leader => NodeBin,
|
||||||
|
messages => 2,
|
||||||
|
messages_ready => 2,
|
||||||
|
messages_unacknowledged => 0,
|
||||||
|
members => [NodeBin]}], Queues),
|
||||||
|
assert_item(#{name => <<"foo">>,
|
||||||
|
vhost => <<"/">>,
|
||||||
|
durable => true,
|
||||||
|
auto_delete => false,
|
||||||
|
exclusive => null,
|
||||||
|
arguments => #{'x-queue-type' => <<"quorum">>},
|
||||||
|
leader => NodeBin,
|
||||||
|
messages => 2,
|
||||||
|
messages_ready => 2,
|
||||||
|
messages_unacknowledged => 0,
|
||||||
|
members => [NodeBin]}, Queue),
|
||||||
|
|
||||||
|
?assert(not maps:is_key(message_stats, Queue)),
|
||||||
|
?assert(not maps:is_key(messages_details, Queue)),
|
||||||
|
?assert(not maps:is_key(reductions_details, Queue)),
|
||||||
|
true
|
||||||
end,
|
end,
|
||||||
await_condition(Fun),
|
await_condition(Fun),
|
||||||
|
|
||||||
Queues = http_get(Config, "/queues/%2F"),
|
|
||||||
Queue = http_get(Config, "/queues/%2F/foo"),
|
|
||||||
|
|
||||||
Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
|
|
||||||
NodeBin = atom_to_binary(Node, utf8),
|
|
||||||
assert_list([#{name => <<"baz">>,
|
|
||||||
vhost => <<"/">>,
|
|
||||||
durable => true,
|
|
||||||
auto_delete => false,
|
|
||||||
exclusive => false,
|
|
||||||
arguments => #{'x-queue-type' => <<"classic">>},
|
|
||||||
node => NodeBin,
|
|
||||||
messages => 1,
|
|
||||||
messages_ready => 1,
|
|
||||||
messages_unacknowledged => 0},
|
|
||||||
#{name => <<"foo">>,
|
|
||||||
vhost => <<"/">>,
|
|
||||||
durable => true,
|
|
||||||
auto_delete => false,
|
|
||||||
exclusive => null,
|
|
||||||
arguments => #{'x-queue-type' => <<"quorum">>},
|
|
||||||
leader => NodeBin,
|
|
||||||
messages => 2,
|
|
||||||
messages_ready => 2,
|
|
||||||
messages_unacknowledged => 0,
|
|
||||||
members => [NodeBin]}], Queues),
|
|
||||||
assert_item(#{name => <<"foo">>,
|
|
||||||
vhost => <<"/">>,
|
|
||||||
durable => true,
|
|
||||||
auto_delete => false,
|
|
||||||
exclusive => null,
|
|
||||||
arguments => #{'x-queue-type' => <<"quorum">>},
|
|
||||||
leader => NodeBin,
|
|
||||||
messages => 2,
|
|
||||||
messages_ready => 2,
|
|
||||||
messages_unacknowledged => 0,
|
|
||||||
members => [NodeBin]}, Queue),
|
|
||||||
|
|
||||||
?assert(not maps:is_key(message_stats, Queue)),
|
|
||||||
?assert(not maps:is_key(messages_details, Queue)),
|
|
||||||
?assert(not maps:is_key(reductions_details, Queue)),
|
|
||||||
|
|
||||||
http_delete(Config, "/queues/%2F/foo", {group, '2xx'}),
|
http_delete(Config, "/queues/%2F/foo", {group, '2xx'}),
|
||||||
http_delete(Config, "/queues/%2F/baz", {group, '2xx'}),
|
http_delete(Config, "/queues/%2F/baz", {group, '2xx'}),
|
||||||
close_connection(Conn),
|
close_connection(Conn),
|
||||||
|
|
|
@ -51,7 +51,7 @@ dep_khepri_mnesia_migration = hex 0.7.1
|
||||||
dep_meck = hex 1.0.0
|
dep_meck = hex 1.0.0
|
||||||
dep_osiris = git https://github.com/rabbitmq/osiris v1.8.6
|
dep_osiris = git https://github.com/rabbitmq/osiris v1.8.6
|
||||||
dep_prometheus = hex 4.11.0
|
dep_prometheus = hex 4.11.0
|
||||||
dep_ra = hex 2.16.3
|
dep_ra = hex 2.16.5
|
||||||
dep_ranch = hex 2.2.0
|
dep_ranch = hex 2.2.0
|
||||||
dep_recon = hex 2.5.6
|
dep_recon = hex 2.5.6
|
||||||
dep_redbug = hex 2.0.7
|
dep_redbug = hex 2.0.7
|
||||||
|
|
Loading…
Reference in New Issue