Merge pull request #11980 from rabbitmq/md/khepri-minority-errors/queue-declaration
This commit is contained in:
commit
140abd871a
|
|
@ -252,22 +252,30 @@ get_queue_type(Args, DefaultQueueType) ->
|
|||
rabbit_queue_type:discover(V)
|
||||
end.
|
||||
|
||||
-spec internal_declare(amqqueue:amqqueue(), boolean()) ->
|
||||
{created | existing, amqqueue:amqqueue()} | queue_absent().
|
||||
-spec internal_declare(Queue, Recover) -> Ret when
|
||||
Queue :: amqqueue:amqqueue(),
|
||||
Recover :: boolean(),
|
||||
Ret :: {created | existing, amqqueue:amqqueue()} |
|
||||
queue_absent() |
|
||||
rabbit_khepri:timeout_error().
|
||||
|
||||
internal_declare(Q, Recover) ->
|
||||
do_internal_declare(Q, Recover).
|
||||
|
||||
do_internal_declare(Q0, true) ->
|
||||
%% TODO Why do we return the old state instead of the actual one?
|
||||
%% I'm leaving it like it was before the khepri refactor, because
|
||||
%% rabbit_amqqueue_process:init_it2 compares the result of this declare to decide
|
||||
%% if continue or stop. If we return the actual one, it fails and the queue stops
|
||||
%% silently during init.
|
||||
%% Maybe we should review this bit of code at some point.
|
||||
Q = amqqueue:set_state(Q0, live),
|
||||
ok = store_queue(Q),
|
||||
{created, Q0};
|
||||
case store_queue(Q) of
|
||||
ok ->
|
||||
%% TODO Why do we return the old state instead of the actual one?
|
||||
%% I'm leaving it like it was before the khepri refactor, because
|
||||
%% rabbit_amqqueue_process:init_it2 compares the result of this
|
||||
%% declare to decide if continue or stop. If we return the actual
|
||||
%% one, it fails and the queue stops silently during init.
|
||||
%% Maybe we should review this bit of code at some point.
|
||||
{created, Q0};
|
||||
{error, timeout} = Err ->
|
||||
Err
|
||||
end;
|
||||
do_internal_declare(Q0, false) ->
|
||||
Q = rabbit_policy:set(amqqueue:set_state(Q0, live)),
|
||||
Queue = rabbit_queue_decorator:set(Q),
|
||||
|
|
@ -280,12 +288,18 @@ do_internal_declare(Q0, false) ->
|
|||
update(Name, Fun) ->
|
||||
rabbit_db_queue:update(Name, Fun).
|
||||
|
||||
%% only really used for quorum queues to ensure the rabbit_queue record
|
||||
-spec ensure_rabbit_queue_record_is_initialized(Queue) -> Ret when
|
||||
Queue :: amqqueue:amqqueue(),
|
||||
Ret :: ok | {error, timeout}.
|
||||
|
||||
%% only really used for stream queues to ensure the rabbit_queue record
|
||||
%% is initialised
|
||||
ensure_rabbit_queue_record_is_initialized(Q) ->
|
||||
store_queue(Q).
|
||||
|
||||
-spec store_queue(amqqueue:amqqueue()) -> 'ok'.
|
||||
-spec store_queue(Queue) -> Ret when
|
||||
Queue :: amqqueue:amqqueue(),
|
||||
Ret :: ok | {error, timeout}.
|
||||
|
||||
store_queue(Q0) ->
|
||||
Q = rabbit_queue_decorator:set(Q0),
|
||||
|
|
@ -325,12 +339,10 @@ is_server_named_allowed(Args) ->
|
|||
Type = get_queue_type(Args),
|
||||
rabbit_queue_type:is_server_named_allowed(Type).
|
||||
|
||||
-spec lookup
|
||||
(name()) ->
|
||||
rabbit_types:ok(amqqueue:amqqueue()) |
|
||||
rabbit_types:error('not_found');
|
||||
([name()]) ->
|
||||
[amqqueue:amqqueue()].
|
||||
-spec lookup(QueueName) -> Ret when
|
||||
QueueName :: name(),
|
||||
Ret :: rabbit_types:ok(amqqueue:amqqueue())
|
||||
| rabbit_types:error('not_found').
|
||||
|
||||
lookup(Name) when is_record(Name, resource) ->
|
||||
rabbit_db_queue:get(Name).
|
||||
|
|
|
|||
|
|
@ -226,6 +226,12 @@ init_it2(Recover, From, State = #q{q = Q,
|
|||
false ->
|
||||
{stop, normal, {existing, Q1}, State}
|
||||
end;
|
||||
{error, timeout} ->
|
||||
Reason = {protocol_error, internal_error,
|
||||
"Could not declare ~ts on node '~ts' because the "
|
||||
"metadata store operation timed out",
|
||||
[rabbit_misc:rs(amqqueue:get_name(Q)), node()]},
|
||||
{stop, normal, Reason, State};
|
||||
Err ->
|
||||
{stop, normal, Err, State}
|
||||
end.
|
||||
|
|
@ -311,7 +317,7 @@ terminate(normal, State) -> %% delete case
|
|||
terminate(_Reason, State = #q{q = Q}) ->
|
||||
terminate_shutdown(fun (BQS) ->
|
||||
Q2 = amqqueue:set_state(Q, crashed),
|
||||
rabbit_amqqueue:store_queue(Q2),
|
||||
_ = rabbit_amqqueue:store_queue(Q2),
|
||||
BQS
|
||||
end, State).
|
||||
|
||||
|
|
|
|||
|
|
@ -875,7 +875,10 @@ get_all_by_type_and_node_in_khepri(VHostName, Type, Node) ->
|
|||
|
||||
-spec create_or_get(Queue) -> Ret when
|
||||
Queue :: amqqueue:amqqueue(),
|
||||
Ret :: {created, Queue} | {existing, Queue} | {absent, Queue, nodedown}.
|
||||
Ret :: {created, Queue} |
|
||||
{existing, Queue} |
|
||||
{absent, Queue, nodedown} |
|
||||
rabbit_khepri:timeout_error().
|
||||
%% @doc Writes a queue record if it doesn't exist already or returns the existing one
|
||||
%%
|
||||
%% @returns the existing record if there is one in the database already, or the newly
|
||||
|
|
@ -924,8 +927,9 @@ create_or_get_in_khepri(Q) ->
|
|||
%% set().
|
||||
%% -------------------------------------------------------------------
|
||||
|
||||
-spec set(Queue) -> ok when
|
||||
Queue :: amqqueue:amqqueue().
|
||||
-spec set(Queue) -> Ret when
|
||||
Queue :: amqqueue:amqqueue(),
|
||||
Ret :: ok | rabbit_khepri:timeout_error().
|
||||
%% @doc Writes a queue record. If the queue is durable, it writes both instances:
|
||||
%% durable and transient. For the durable one, it resets decorators.
|
||||
%% The transient one is left as it is.
|
||||
|
|
|
|||
|
|
@ -295,7 +295,12 @@ start_cluster(Q) ->
|
|||
declare_queue_error(Error, NewQ, LeaderNode, ActingUser)
|
||||
end;
|
||||
{existing, _} = Ex ->
|
||||
Ex
|
||||
Ex;
|
||||
{error, timeout} ->
|
||||
{protocol_error, internal_error,
|
||||
"Could not declare quorum ~ts on node '~ts' because the metadata "
|
||||
"store operation timed out",
|
||||
[rabbit_misc:rs(QName), node()]}
|
||||
end.
|
||||
|
||||
declare_queue_error(Error, Queue, Leader, ActingUser) ->
|
||||
|
|
|
|||
|
|
@ -1231,7 +1231,7 @@ phase_update_mnesia(StreamId, Args, #{reference := QName,
|
|||
#{name := S} when S == StreamId ->
|
||||
rabbit_log:debug("~ts: initializing queue record for stream id ~ts",
|
||||
[?MODULE, StreamId]),
|
||||
_ = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
|
||||
ok = rabbit_amqqueue:ensure_rabbit_queue_record_is_initialized(Fun(Q)),
|
||||
ok;
|
||||
_ ->
|
||||
ok
|
||||
|
|
|
|||
|
|
@ -177,25 +177,37 @@ create_stream(Q0) ->
|
|||
case rabbit_stream_coordinator:new_stream(Q, Leader) of
|
||||
{ok, {ok, LeaderPid}, _} ->
|
||||
%% update record with leader pid
|
||||
set_leader_pid(LeaderPid, amqqueue:get_name(Q)),
|
||||
rabbit_event:notify(queue_created,
|
||||
[{name, QName},
|
||||
{durable, true},
|
||||
{auto_delete, false},
|
||||
{arguments, Arguments},
|
||||
{type, amqqueue:get_type(Q1)},
|
||||
{user_who_performed_action,
|
||||
ActingUser}]),
|
||||
{new, Q};
|
||||
case set_leader_pid(LeaderPid, amqqueue:get_name(Q)) of
|
||||
ok ->
|
||||
rabbit_event:notify(queue_created,
|
||||
[{name, QName},
|
||||
{durable, true},
|
||||
{auto_delete, false},
|
||||
{arguments, Arguments},
|
||||
{type, amqqueue:get_type(Q1)},
|
||||
{user_who_performed_action,
|
||||
ActingUser}]),
|
||||
{new, Q};
|
||||
{error, timeout} ->
|
||||
{protocol_error, internal_error,
|
||||
"Could not set leader PID for ~ts on node '~ts' "
|
||||
"because the metadata store operation timed out",
|
||||
[rabbit_misc:rs(QName), node()]}
|
||||
end;
|
||||
Error ->
|
||||
_ = rabbit_amqqueue:internal_delete(Q, ActingUser),
|
||||
{protocol_error, internal_error, "Cannot declare a queue '~ts' on node '~ts': ~255p",
|
||||
{protocol_error, internal_error, "Cannot declare ~ts on node '~ts': ~255p",
|
||||
[rabbit_misc:rs(QName), node(), Error]}
|
||||
end;
|
||||
{existing, Q} ->
|
||||
{existing, Q};
|
||||
{absent, Q, Reason} ->
|
||||
{absent, Q, Reason}
|
||||
{absent, Q, Reason};
|
||||
{error, timeout} ->
|
||||
{protocol_error, internal_error,
|
||||
"Could not declare ~ts on node '~ts' because the metadata store "
|
||||
"operation timed out",
|
||||
[rabbit_misc:rs(QName), node()]}
|
||||
end.
|
||||
|
||||
-spec delete(amqqueue:amqqueue(), boolean(),
|
||||
|
|
@ -1291,6 +1303,11 @@ resend_all(#stream_client{leader = LeaderPid,
|
|||
end || {Seq, Msg} <- Msgs],
|
||||
State.
|
||||
|
||||
-spec set_leader_pid(Pid, QName) -> Ret when
|
||||
Pid :: pid(),
|
||||
QName :: rabbit_amqqueue:name(),
|
||||
Ret :: ok | {error, timeout}.
|
||||
|
||||
set_leader_pid(Pid, QName) ->
|
||||
%% TODO this should probably be a single khepri transaction for better performance.
|
||||
Fun = fun (Q) ->
|
||||
|
|
|
|||
|
|
@ -70,8 +70,10 @@ is_stateful() ->
|
|||
|
||||
-spec declare(amqqueue:amqqueue(), node()) ->
|
||||
{'new' | 'existing' | 'owner_died', amqqueue:amqqueue()} |
|
||||
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()}.
|
||||
{'absent', amqqueue:amqqueue(), rabbit_amqqueue:absent_reason()} |
|
||||
{protocol_error, internal_error, string(), [string()]}.
|
||||
declare(Q0, _Node) ->
|
||||
QName = amqqueue:get_name(Q0),
|
||||
Q1 = case amqqueue:get_pid(Q0) of
|
||||
none ->
|
||||
%% declaring process becomes the queue
|
||||
|
|
@ -86,7 +88,7 @@ declare(Q0, _Node) ->
|
|||
Opts = amqqueue:get_options(Q),
|
||||
ActingUser = maps:get(user, Opts, ?UNKNOWN_USER),
|
||||
rabbit_event:notify(queue_created,
|
||||
[{name, amqqueue:get_name(Q)},
|
||||
[{name, QName},
|
||||
{durable, true},
|
||||
{auto_delete, false},
|
||||
{exclusive, true},
|
||||
|
|
@ -94,6 +96,11 @@ declare(Q0, _Node) ->
|
|||
{arguments, amqqueue:get_arguments(Q)},
|
||||
{user_who_performed_action, ActingUser}]),
|
||||
{new, Q};
|
||||
{error, timeout} ->
|
||||
{protocol_error, internal_error,
|
||||
"Could not declare ~ts because the metadata store operation "
|
||||
"timed out",
|
||||
[rabbit_misc:rs(QName)]};
|
||||
Other ->
|
||||
Other
|
||||
end.
|
||||
|
|
|
|||
Loading…
Reference in New Issue