removes configuration via parameters

This commit is contained in:
Alvaro Videla 2014-03-12 15:18:42 +01:00
parent ec1259ad6f
commit 73fc1a6813
2 changed files with 20 additions and 95 deletions

View File

@ -1,11 +1,9 @@
-module(rabbit_sharding_parameters).
-behaviour(rabbit_runtime_parameter).
-behaviour(rabbit_policy_validator).
-include_lib("rabbit_common/include/rabbit.hrl").
-export([validate/4, notify/4, notify_clear/3]).
-export([register/0, validate_policy/1]).
-rabbit_boot_step({?MODULE,
@ -16,69 +14,9 @@
register() ->
[rabbit_registry:register(Class, Name, ?MODULE) ||
{Class, Name} <- [{runtime_parameter, <<"sharding">>},
{runtime_parameter, <<"sharding-definition">>},
{policy_validator, <<"sharding-definition">>}]],
ok.
validate(_VHost, <<"sharding">>, <<"shards-per-node">>, Term) ->
validate_shards_per_node(<<"shards-per-node">>, Term);
validate(_VHost, <<"sharding">>, <<"routing-key">>, Term) ->
rabbit_parameter_validation:binary(<<"routing-key">>, Term);
validate(_VHost, <<"sharding-definition">>, Name, Term) ->
rabbit_parameter_validation:proplist(
Name,
[{<<"sharded">>, fun rabbit_parameter_validation:boolean/2, mandatory},
{<<"shards-per-node">>, fun validate_shards_per_node/2, optional},
{<<"routing-key">>, fun rabbit_parameter_validation:binary/2, optional}],
Term);
validate(_VHost, _Component, Name, _Term) ->
{error, "name not recognised: ~p", [Name]}.
%% If the user wants to reduce the shards number, we can't
%% delete queues, but when adding new nodes, those nodes will
%% have the new parameter.
%% perhaps queues names above the "shards-per-node" value should
%% be unbinded from the exchange.
notify(VHost, <<"sharding">>, <<"shards-per-node">>, _Term) ->
rabbit_sharding_shard:update_shards(VHost, shards_per_node),
ok;
notify(VHost, <<"sharding">>, <<"routing-key">>, _Term) ->
rabbit_sharding_shard:update_shards(VHost, all),
ok;
notify(_VHost, <<"sharding">>, _Name, _Term) ->
ok;
%% Maybe increase shard number by declaring new queues
%% in case shards-per-node increased.
%% We can't delete extra queues because the user might have messages on them.
%% We just ensure that there are SPN number of queues.
notify(VHost, <<"sharding-definition">>, Name, _Term) ->
rabbit_sharding_shard:update_named_shard(VHost, Name),
ok.
notify_clear(VHost, <<"sharding">>, <<"shards-per-node">>) ->
rabbit_sharding_shard:update_shards(VHost, all),
ok;
notify_clear(VHost, <<"sharding">>, <<"routing-key">>) ->
rabbit_sharding_shard:update_shards(VHost, all),
ok;
%% A shard definition is gone. We can't remove queues so
%% we resort to defaults when declaring queues or while
%% intercepting channel methods.
%% 1) don't care about connection param changes. They will be
%% used automatically next time we need to declare a queue.
%% 2) we need to bind the queues using the new routing key
%% and unbind them from the old one.
notify_clear(VHost, <<"sharding-definition">>, Name) ->
rabbit_sharding_shard:update_named_shard(VHost, Name),
{Class, Name} <- [{policy_validator, <<"sharded">>},
{policy_validator, <<"shards-per-node">>},
{policy_validator, <<"routing-key">>}]],
ok.
validate_shards_per_node(Name, Term) when is_number(Term) ->
@ -93,8 +31,10 @@ validate_shards_per_node(Name, Term) ->
%%----------------------------------------------------------------------------
validate_policy([{<<"sharding-definition">>, Value}])
when is_binary(Value) ->
ok;
validate_policy([{<<"sharding-definition">>, Value}]) ->
{error, "~p is not a valid shard name", [Value]}.
validate_policy(KeyList) ->
rabbit_parameter_validation:proplist(
<<"sharding policy definition">>,
[{<<"sharded">>, fun rabbit_parameter_validation:boolean/2, mandatory},
{<<"shards-per-node">>, fun validate_shards_per_node/2, mandatory},
{<<"routing-key">>, fun rabbit_parameter_validation:binary/2, mandatory}],
KeyList).

View File

@ -1,7 +1,7 @@
-module(rabbit_sharding_util).
-export([shard/1, sharded_exchanges/1]).
-export([get_policy/1, shards_per_node/1, routing_key/1]).
-export([get_policy/2, shards_per_node/1, routing_key/1]).
-export([exchange_bin/1, make_queue_name/3]).
-export([a2b/1, rpc_call/2]).
@ -16,20 +16,22 @@ shard(X = #exchange{type = 'x-random'}) -> shard0(X);
shard(_X) -> false.
shard0(X) ->
get_parameter_value(<<"sharding-definition">>, <<"sharded">>, X, false).
case get_policy(<<"sharded">>, X) of
true -> true;
_ -> false
end.
sharded_exchanges(VHost) ->
[X || X <- find_exchanges(VHost), shard(X)].
get_policy(X) ->
rabbit_policy:get(<<"sharding-definition">>, X).
shards_per_node(X) ->
get_parameter(<<"shards-per-node">>, X, ?DEFAULT_SHARDS_NUM).
get_policy(<<"shards-per-node">>, X).
%% Move routing key to sharding-definition
routing_key(X) ->
get_parameter(<<"routing-key">>, X, ?DEFAULT_RK).
get_policy(<<"shards-per-node">>, X).
get_policy(Key, X) ->
rabbit_policy:get(Key, X).
exchange_bin(#resource{name = XBin}) -> XBin.
@ -46,23 +48,6 @@ a2b(A) -> list_to_binary(atom_to_list(A)).
%%----------------------------------------------------------------------------
get_parameter(Parameter, X, Default) ->
Default2 = rabbit_runtime_parameters:value(
vhost(X), <<"sharding">>, Parameter, Default),
get_parameter_value(<<"sharding-definition">>, Parameter,
X, Default2).
get_parameter_value(Comp, Param, X, Default) ->
case get_policy(X) of
undefined -> Default;
Name ->
case rabbit_runtime_parameters:value(
vhost(X), Comp, Name) of
not_found -> Default;
Value -> pget(Param, Value, Default)
end
end.
find_exchanges(VHost) ->
rabbit_exchange:list(VHost).