test fixes

This commit is contained in:
kjnilsson 2020-07-10 14:25:17 +01:00
parent a5469e186c
commit ce230316d4
3 changed files with 22 additions and 3 deletions

View File

@ -161,7 +161,10 @@ enqueue(Correlation, Msg,
State0#state{queue_status = go};
Err ->
exit(Err)
end
end;
{badrpc, nodedown} ->
rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]),
State0#state{queue_status = go}
end,
enqueue(Correlation, Msg, State);
enqueue(_Correlation, _Msg,
@ -685,8 +688,10 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) ->
error ->
State
end.
resend_all_pending(#state{pending = Pend} = State) ->
Seqs = lists:sort(maps:keys(Pend)),
rabbit_log:info("rabbit_fifo_client resend all pending ~w", [Seqs]),
lists:foldl(fun resend/2, State, Seqs).
handle_delivery(From, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0,
@ -762,7 +767,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
end.
pick_server(#state{leader = undefined,
cfg = #cfg{servers = [N | _]}}) ->
cfg = #cfg{servers = [N | _]}}) ->
%% TODO: pick random rather that first?
N;
pick_server(#state{leader = Leader}) ->

View File

@ -102,6 +102,7 @@ handle_event({ra_event, From, Evt}, QState) ->
{new | existing, amqqueue:amqqueue()} | rabbit_types:channel_exit().
declare(Q) when ?amqqueue_is_quorum(Q) ->
rabbit_log:info("quorum_queue declaring ~w", [Q]),
QName = amqqueue:get_name(Q),
Durable = amqqueue:is_durable(Q),
AutoDelete = amqqueue:is_auto_delete(Q),
@ -123,6 +124,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
RaConfs = [make_ra_conf(NewQ, ServerId, TickTimeout)
|| ServerId <- members(NewQ)],
rabbit_log:info("quorum_queue starting cluster ~w", [RaConfs]),
case ra:start_cluster(RaConfs) of
{ok, _, _} ->
%% TODO: handle error - what should be done if the
@ -1359,6 +1361,17 @@ check_invalid_arguments(QueueName, Args) ->
"invalid arg '~s' for ~s",
[Key, rabbit_misc:rs(QueueName)])
end || Key <- Keys],
case rabbit_misc:table_lookup(Args, <<"x-overflow">>) of
undefined -> ok;
{_, <<"reject-publish-dlx">>} ->
rabbit_misc:protocol_error(
precondition_failed,
"invalid arg 'x-overflow' with value 'reject-publish-dlx' for ~s",
[rabbit_misc:rs(QueueName)]);
_ ->
ok
end,
ok.
check_auto_delete(Q) when ?amqqueue_is_auto_delete(Q) ->

View File

@ -346,7 +346,7 @@ declare_invalid_args(Config) ->
declare(rabbit_ct_client_helpers:open_channel(Config, Server),
LQ, [{<<"x-queue-type">>, longstr, <<"quorum">>},
{<<"x-overflow">>, longstr, XOverflow}]))
|| XOverflow <- [<<"reject-publish">>, <<"reject-publish-dlx">>]],
|| XOverflow <- [<<"reject-publish-dlx">>]],
?assertExit(
{{shutdown, {server_initiated_close, 406, _}}, _},
@ -1038,6 +1038,7 @@ publishing_to_unavailable_queue(Config) ->
ok = rabbit_ct_broker_helpers:stop_node(Config, Server1),
ok = rabbit_ct_broker_helpers:stop_node(Config, Server2),
ct:pal("opening channel to ~w", [Server]),
Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
amqp_channel:register_confirm_handler(Ch, self()),