Add QQ periodic policy repair
This commit is contained in:
parent
3c2134cbeb
commit
f9179d1090
|
|
@ -853,6 +853,7 @@ overview(#?STATE{consumers = Cons,
|
|||
Conf = #{name => Cfg#cfg.name,
|
||||
resource => Cfg#cfg.resource,
|
||||
dead_lettering_enabled => undefined =/= Cfg#cfg.dead_letter_handler,
|
||||
overflow_strategy => Cfg#cfg.overflow_strategy,
|
||||
max_length => Cfg#cfg.max_length,
|
||||
max_bytes => Cfg#cfg.max_bytes,
|
||||
consumer_strategy => Cfg#cfg.consumer_strategy,
|
||||
|
|
|
|||
|
|
@ -624,7 +624,8 @@ handle_tick(QName,
|
|||
ok;
|
||||
_ ->
|
||||
ok
|
||||
end
|
||||
end,
|
||||
maybe_apply_policies(Q, Overview)
|
||||
catch
|
||||
_:Err ->
|
||||
rabbit_log:debug("~ts: handle tick failed with ~p",
|
||||
|
|
@ -708,6 +709,44 @@ system_recover(quorum_queues) ->
|
|||
ok
|
||||
end.
|
||||
|
||||
maybe_apply_policies(Q, Overview) ->
|
||||
rabbit_log:debug("Maybe applying policies to ~p", [amqqueue:get_name(Q)]),
|
||||
EffectiveDefinition = rabbit_policy:effective_definition(Q),
|
||||
#{
|
||||
config := #{
|
||||
overflow_strategy := OverflowStrategy,
|
||||
max_length := MaxLength,
|
||||
max_bytes := MaxBytes,
|
||||
delivery_limit := DeliverLimit,
|
||||
expires := Expires,
|
||||
msg_ttl := MsgTTL
|
||||
}
|
||||
} = Overview,
|
||||
Checks = [
|
||||
{<<"max-length">>, MaxLength},
|
||||
{<<"max-length-bytes">>, MaxBytes},
|
||||
{<<"delivery-limit">>, DeliverLimit},
|
||||
{<<"expires">>, Expires},
|
||||
{<<"message-ttl">>, MsgTTL},
|
||||
{<<"overflow">>, OverflowStrategy}
|
||||
],
|
||||
ShouldUpdate = lists:any(
|
||||
fun({Key, Val}) ->
|
||||
case proplists:get_value(Key, EffectiveDefinition) of
|
||||
undefined -> false;
|
||||
V -> V =/= Val
|
||||
end
|
||||
end,
|
||||
Checks
|
||||
),
|
||||
case ShouldUpdate of
|
||||
true ->
|
||||
rabbit_log:debug("Re-applying policies to ~p", [amqqueue:get_name(Q)]),
|
||||
policy_changed(Q),
|
||||
ok;
|
||||
false -> ok
|
||||
end.
|
||||
|
||||
-spec recover(binary(), [amqqueue:amqqueue()]) ->
|
||||
{[amqqueue:amqqueue()], [amqqueue:amqqueue()]}.
|
||||
recover(_Vhost, Queues) ->
|
||||
|
|
@ -2064,3 +2103,4 @@ file_handle_other_reservation() ->
|
|||
|
||||
file_handle_release_reservation() ->
|
||||
ok.
|
||||
|
||||
|
|
|
|||
Loading…
Reference in New Issue