Merge pull request #12317 from rabbitmq/md/khepri/mqtt-fixes

Handle database timeouts in MQTT queue deletion
This commit is contained in:
Michael Klishin 2024-09-16 19:00:09 -04:00 committed by GitHub
commit 4805e31e37
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
8 changed files with 56 additions and 31 deletions

View File

@ -1523,7 +1523,14 @@ notify_policy_changed(Q) when ?is_amqqueue(Q) ->
consumers(Q) when ?amqqueue_is_classic(Q) ->
QPid = amqqueue:get_pid(Q),
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]});
try
delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]})
catch
exit:_ ->
%% The queue process exited during the call.
%% Note that `delegate:invoke/2' catches errors but not exits.
[]
end;
consumers(Q) when ?amqqueue_is_quorum(Q) ->
QPid = amqqueue:get_pid(Q),
case ra:local_query(QPid, fun rabbit_fifo:query_consumers/1) of
@ -1619,17 +1626,23 @@ delete_immediately_by_resource(Resources) ->
-spec delete
(amqqueue:amqqueue(), 'false', 'false', rabbit_types:username()) ->
qlen() |
rabbit_types:error(timeout) |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()};
(amqqueue:amqqueue(), 'true' , 'false', rabbit_types:username()) ->
qlen() | rabbit_types:error('in_use') |
qlen() |
rabbit_types:error('in_use') |
rabbit_types:error(timeout) |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()};
(amqqueue:amqqueue(), 'false', 'true', rabbit_types:username()) ->
qlen() | rabbit_types:error('not_empty') |
qlen() |
rabbit_types:error('not_empty') |
rabbit_types:error(timeout) |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()};
(amqqueue:amqqueue(), 'true' , 'true', rabbit_types:username()) ->
qlen() |
rabbit_types:error('in_use') |
rabbit_types:error('not_empty') |
rabbit_types:error(timeout) |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
delete(Q, IfUnused, IfEmpty, ActingUser) ->
rabbit_queue_type:delete(Q, IfUnused, IfEmpty, ActingUser).

View File

@ -171,11 +171,8 @@ delete(Q0, IfUnused, IfEmpty, ActingUser) when ?amqqueue_is_classic(Q0) ->
case delete_crashed_internal(Q, ActingUser) of
ok ->
{ok, 0};
{error, timeout} ->
{error, protocol_error,
"The operation to delete ~ts from the "
"metadata store timed out",
[rabbit_misc:rs(QName)]}
{error, timeout} = Err ->
Err
end
end
end;

View File

@ -383,6 +383,7 @@ declare(Q0, Node) ->
boolean(), rabbit_types:username()) ->
rabbit_types:ok(non_neg_integer()) |
rabbit_types:error(in_use | not_empty) |
rabbit_types:error(timeout) |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
delete(Q, IfUnused, IfEmpty, ActingUser) ->
Mod = amqqueue:get_type(Q),

View File

@ -823,10 +823,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
_ = erpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName],
?RPC_TIMEOUT),
{ok, ReadyMsgs};
{error, timeout} ->
{protocol_error, internal_error,
"The operation to delete ~ts from the metadata store "
"timed out", [rabbit_misc:rs(QName)]}
{error, timeout} = Err ->
Err
end;
{error, {no_more_servers_to_try, Errs}} ->
case lists:all(fun({{error, noproc}, _}) -> true;
@ -849,10 +847,8 @@ delete(Q, _IfUnused, _IfEmpty, ActingUser) when ?amqqueue_is_quorum(Q) ->
case delete_queue_data(Q, ActingUser) of
ok ->
{ok, ReadyMsgs};
{error, timeout} ->
{protocol_error, internal_error,
"The operation to delete queue ~ts from the metadata "
"store timed out", [rabbit_misc:rs(QName)]}
{error, timeout} = Err ->
Err
end
end.

View File

@ -189,8 +189,12 @@ delete_stream(Q, ActingUser)
#{name := StreamId} = amqqueue:get_type_state(Q),
case process_command({delete_stream, StreamId, #{}}) of
{ok, ok, _} ->
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
{ok, {ok, 0}};
case rabbit_amqqueue:internal_delete(Q, ActingUser) of
ok ->
{ok, {ok, 0}};
{error, timeout} = Err ->
Err
end;
Err ->
Err
end.

View File

@ -214,11 +214,14 @@ create_stream(Q0) ->
-spec delete(amqqueue:amqqueue(), boolean(),
boolean(), rabbit_types:username()) ->
rabbit_types:ok(non_neg_integer()) |
rabbit_types:error(in_use | not_empty).
rabbit_types:error(timeout) |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
case rabbit_stream_coordinator:delete_stream(Q, ActingUser) of
{ok, Reply} ->
Reply;
{error, timeout} = Err ->
Err;
Error ->
{protocol_error, internal_error, "Cannot delete ~ts on node '~ts': ~255p ",
[rabbit_misc:rs(amqqueue:get_name(Q)), node(), Error]}

View File

@ -769,7 +769,9 @@ handle_clean_start(_, QoS, State = #state{cfg = #cfg{clean_start = true}}) ->
ok ->
{ok, SessPresent, State};
{error, access_refused} ->
{error, ?RC_NOT_AUTHORIZED}
{error, ?RC_NOT_AUTHORIZED};
{error, _Reason} ->
{error, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}
end
end;
handle_clean_start(SessPresent, QoS,
@ -991,7 +993,8 @@ clear_will_msg(#state{cfg = #cfg{vhost = Vhost,
QName = #resource{virtual_host = Vhost, kind = queue, name = QNameBin},
case delete_queue(QName, State) of
ok -> ok;
{error, access_refused} -> {error, ?RC_NOT_AUTHORIZED}
{error, access_refused} -> {error, ?RC_NOT_AUTHORIZED};
{error, _Reason} -> {error, ?RC_IMPLEMENTATION_SPECIFIC_ERROR}
end.
make_will_msg(#mqtt_packet_connect{will_flag = false}) ->
@ -1323,8 +1326,10 @@ ensure_queue(QoS, State) ->
case delete_queue(QName, State) of
ok ->
create_queue(QoS, State);
{error, access_refused} = E ->
E
{error, _} = Err ->
Err;
{protocol_error, _, _, _} = Err ->
{error, Err}
end;
{error, not_found} ->
create_queue(QoS, State)
@ -1829,7 +1834,10 @@ maybe_delete_mqtt_qos0_queue(_) ->
ok.
-spec delete_queue(rabbit_amqqueue:name(), state()) ->
ok | {error, access_refused}.
ok |
{error, access_refused} |
{error, timeout} |
{protocol_error, Type :: atom(), Reason :: string(), Args :: term()}.
delete_queue(QName,
#state{auth_state = #auth_state{
user = User = #user{username = Username},
@ -1841,8 +1849,12 @@ delete_queue(QName,
fun (Q) ->
case check_resource_access(User, QName, configure, AuthzCtx) of
ok ->
{ok, _N} = rabbit_queue_type:delete(Q, false, false, Username),
ok;
case rabbit_queue_type:delete(Q, false, false, Username) of
{ok, _} ->
ok;
Err ->
Err
end;
Err ->
Err
end

View File

@ -109,17 +109,16 @@ declare(Q0, _Node) ->
boolean(),
boolean(),
rabbit_types:username()) ->
rabbit_types:ok(non_neg_integer()).
rabbit_types:ok(non_neg_integer()) |
rabbit_types:error(timeout).
delete(Q, _IfUnused, _IfEmpty, ActingUser) ->
QName = amqqueue:get_name(Q),
log_delete(QName, amqqueue:get_exclusive_owner(Q)),
case rabbit_amqqueue:internal_delete(Q, ActingUser) of
ok ->
{ok, 0};
{error, timeout} ->
{protocol_error, internal_error,
"The operation to delete ~ts from the metadata store timed "
"out", [rabbit_misc:rs(QName)]}
{error, timeout} = Err ->
Err
end.
-spec deliver([{amqqueue:amqqueue(), stateless}],