Store hash ring state in a single table

This implementation is significantly simpler and doesn't
perform nearly as many Mnesia operations.

Pair: @dcorbacho.

References #37, #38.

[#159822323]
This commit is contained in:
Michael Klishin 2018-08-28 16:40:38 +03:00
parent e5e9316b9e
commit a2d4f0bd87
1 changed files with 101 additions and 142 deletions

View File

@ -27,25 +27,12 @@
-export([init/0]).
-export([info/1, info/2]).
-record(bucket, {
%% a {resource, bucket} pair
%% where bucket is a non-negative integer
id,
%% a resource
queue
}).
-record(bucket_count, {
exchange,
count
}).
-record(binding_buckets, {
%% an {exchange, queue} pair because we
%% assume that there's only one binding between
%% a consistent hash exchange and a queue
id,
bucket_numbers = []
-record(chx_hash_ring, {
%% a resource
exchange,
%% a map of bucket => queue | exchange
bucket_map,
next_bucket_number
}).
-rabbit_boot_step(
@ -66,17 +53,9 @@
{enables, external_infrastructure}]}).
%% This data model allows for efficient routing and exchange deletion
%% but not efficient binding management. This is a future area of improvement.
%% A couple of alternatives were considered, e.g. storing the entire ring state
%% in a single map. Without an additional structure such as a balanced tree
%% ring updates would be even less efficient (but easier to follow).
%% but less efficient (linear) binding management.
%% maps buckets to queues
-define(BUCKET_TABLE, rabbit_exchange_type_consistent_hash_bucket_queue).
%% maps exchange to total the number of buckets
-define(BUCKET_COUNT_TABLE, rabbit_exchange_type_consistent_hash_bucket_count).
%% maps {exchange, queue} pairs to a list of buckets
-define(BINDING_BUCKET_TABLE, rabbit_exchange_type_consistent_hash_binding_bucket).
-define(HASH_RING_STATE_TABLE, rabbit_exchange_type_consistent_hash_ring_state).
-define(PROPERTIES, [<<"correlation_id">>, <<"message_id">>, <<"timestamp">>]).
@ -91,21 +70,26 @@ description() ->
serialise_events() -> false.
route(#exchange { name = Name,
arguments = Args },
#delivery { message = Msg }) ->
case ets:lookup(?BUCKET_COUNT_TABLE, Name) of
route(#exchange {name = Name,
arguments = Args},
#delivery {message = Msg}) ->
case ets:lookup(?HASH_RING_STATE_TABLE, Name) of
[] ->
[];
[#bucket_count{count = N}] ->
K = value_to_hash(hash_on(Args), Msg),
SelectedBucket = jump_consistent_hash(K, N),
case mnesia:dirty_read({?BUCKET_TABLE, {Name, SelectedBucket}}) of
[Bucket] -> [Bucket#bucket.queue];
[] -> rabbit_log:warning("Bucket ~p not found", [SelectedBucket]),
[]
end
[#chx_hash_ring{bucket_map = BM}] ->
case maps:size(BM) of
0 -> [];
N ->
K = value_to_hash(hash_on(Args), Msg),
SelectedBucket = jump_consistent_hash(K, N),
case maps:get(SelectedBucket, BM, undefined) of
undefined ->
rabbit_log:warning("Bucket ~p not found", [SelectedBucket]),
[];
Queue -> [Queue]
end
end
end.
validate(#exchange { arguments = Args }) ->
@ -139,63 +123,64 @@ validate_binding(_X, #binding { key = K }) ->
{error, {binding_invalid, "The binding key must be an integer: ~p", [K]}}
end.
create(_Tx, _X) -> ok.
maybe_initialise_hash_ring_state(transaction, X) ->
case mnesia:read(?HASH_RING_STATE_TABLE, X) of
[_] -> ok;
[] ->
mnesia:write_lock_table(?HASH_RING_STATE_TABLE),
ok = mnesia:write(?HASH_RING_STATE_TABLE, #chx_hash_ring{
exchange = X,
next_bucket_number = 0,
bucket_map = #{}}, write)
end;
maybe_initialise_hash_ring_state(_, X) ->
rabbit_misc:execute_mnesia_transaction(
fun() -> maybe_initialise_hash_ring_state(transaction, X) end).
create(transaction, X) ->
maybe_initialise_hash_ring_state(transaction, X);
create(Tx, X) ->
maybe_initialise_hash_ring_state(Tx, X).
delete(transaction, #exchange{name = Name}, _Bs) ->
ok = mnesia:write_lock_table(?BUCKET_TABLE),
ok = mnesia:write_lock_table(?BUCKET_COUNT_TABLE),
mnesia:write_lock_table(?HASH_RING_STATE_TABLE),
Numbers = mnesia:select(?BUCKET_TABLE, [{
#bucket{id = {Name, '$1'}, _ = '_'},
[],
['$1']
}]),
[mnesia:delete({?BUCKET_TABLE, {Name, N}})
|| N <- Numbers],
Queues = mnesia:select(?BINDING_BUCKET_TABLE,
[{
#binding_buckets{id = {Name, '$1'}, _ = '_'},
[],
['$1']
}]),
[mnesia:delete({?BINDING_BUCKET_TABLE, {Name, Q}})
|| Q <- Queues],
mnesia:delete({?BUCKET_COUNT_TABLE, Name}),
ok;
ok = mnesia:delete({?HASH_RING_STATE_TABLE, Name});
delete(_Tx, _X, _Bs) ->
ok.
policy_changed(_X1, _X2) -> ok.
add_binding(transaction, _X,
#binding{source = S, destination = D, key = K}) ->
add_binding(transaction, X,
B = #binding{source = S, destination = D, key = K}) ->
Weight = rabbit_data_coercion:to_integer(K),
mnesia:write_lock_table(?BUCKET_TABLE),
mnesia:write_lock_table(?BUCKET_COUNT_TABLE),
mnesia:write_lock_table(?HASH_RING_STATE_TABLE),
LastBucketNum = bucket_count_of(S),
NewBucketCount = LastBucketNum + Weight,
case mnesia:read(?HASH_RING_STATE_TABLE, S) of
[State0 = #chx_hash_ring{bucket_map = BM0,
next_bucket_number = NexN0}] ->
NextN = NexN0 + Weight,
%% hi/lo bucket counters are 0-based but weight is 1-based
Range = lists:seq(NexN0, (NextN - 1)),
BM = lists:foldl(fun(Key, Acc) ->
maps:put(Key, D, Acc)
end, BM0, Range),
State = State0#chx_hash_ring{bucket_map = BM,
next_bucket_number = NextN},
Numbers = lists:seq(LastBucketNum, (NewBucketCount - 1)),
Buckets = [#bucket{id = {S, I}, queue = D} || I <- Numbers],
[ok = mnesia:write(?BUCKET_TABLE, B, write) || B <- Buckets],
mnesia:write(?BINDING_BUCKET_TABLE, #binding_buckets{id = {S, D},
bucket_numbers = Numbers}, write),
mnesia:write(?BUCKET_COUNT_TABLE, #bucket_count{exchange = S,
count = NewBucketCount}, write),
ok;
ok = mnesia:write(?HASH_RING_STATE_TABLE, State, write),
ok;
[] ->
maybe_initialise_hash_ring_state(transaction, S),
add_binding(transaction, X, B)
end;
add_binding(none, _X, _B) ->
ok.
remove_bindings(transaction, _X, Bindings) ->
mnesia:write_lock_table(?BUCKET_TABLE),
mnesia:write_lock_table(?BUCKET_COUNT_TABLE),
mnesia:write_lock_table(?HASH_RING_STATE_TABLE),
[remove_binding(B) || B <- Bindings],
@ -203,78 +188,52 @@ remove_bindings(transaction, _X, Bindings) ->
remove_bindings(none, _X, _Bs) ->
ok.
remove_binding(#binding{source = S, destination = D, key = K}) ->
Weight = rabbit_data_coercion:to_integer(K),
remove_binding(#binding{source = S, destination = D, key = RK}) ->
Weight = rabbit_data_coercion:to_integer(RK),
[#binding_buckets{bucket_numbers = Numbers}] = mnesia:read(?BINDING_BUCKET_TABLE, {S, D}),
LastNum = lists:last(Numbers),
mnesia:write_lock_table(?HASH_RING_STATE_TABLE),
%% Delete all buckets for this {exchange, queue} pair
[ok = mnesia:delete(?BUCKET_TABLE, {S, N}, write) || N <- Numbers],
case mnesia:read(?HASH_RING_STATE_TABLE, S) of
[State0 = #chx_hash_ring{bucket_map = BM0,
next_bucket_number = NexN0}] ->
%% Buckets with lower numbers stay as is; buckets that
%% belong to this binding are removed; buckets with
%% greater numbers are updated (their numbers are adjusted downwards by weight)
BucketsOfThisBinding = maps:filter(fun (_K, V) -> V =:= D end, BM0),
LastBucket = lists:last(maps:keys(BucketsOfThisBinding)),
BucketsDownTheRing = maps:filter(fun (K, _) -> K > LastBucket end, BM0),
%% Buckets with lower numbers stay as is; buckets that
%% belong to this binding are removed; buckets with
%% greater numbers are updated (their numbers are adjusted downwards by weight)
BucketsToUpdate = mnesia:select(?BUCKET_TABLE, [{
#bucket{id = {S, '$1'}, _ = '_'},
[
{'>', '$1', LastNum}
],
['$_']
}]),
QueuesWithUpdatedBuckets = lists:usort([Q || #bucket{queue = Q} <- BucketsToUpdate]),
[ok = mnesia:delete(?BUCKET_TABLE, Id, write) || #bucket{id = Id} <- BucketsToUpdate],
%% hash ring state without the buckets of this binding
BM1 = maps:fold(fun(K, _, Acc) -> maps:remove(K, Acc) end, BM0, BucketsOfThisBinding),
%% final state with "down the ring" buckets updated
BM2 = maps:fold(fun(K0, V, Acc) ->
M = maps:remove(K0, Acc),
maps:put(K0 - Weight, V, M)
end, BM1, BucketsDownTheRing),
UpdatedBuckets = [B#bucket{id = {X, N - Weight}} || B = #bucket{id = {X, N}} <- BucketsToUpdate],
[ok = mnesia:write(?BUCKET_TABLE, B, write) || B <- UpdatedBuckets],
NextN = NexN0 - Weight,
State = State0#chx_hash_ring{bucket_map = BM2,
next_bucket_number = NextN},
%% There will be no buckets for this {exchange, queue} pair to track
ok = mnesia:delete(?BINDING_BUCKET_TABLE, {S, D}, write),
ok = mnesia:write(?HASH_RING_STATE_TABLE, State, write),
%% Update the counter
TotalBucketsForX = bucket_count_of(S),
mnesia:write(?BUCKET_COUNT_TABLE, #bucket_count{exchange = S,
count = TotalBucketsForX - Weight}, write),
%% Update bucket numbers
[begin
case mnesia:read(?BINDING_BUCKET_TABLE, {S, Q}) of
[] -> ok;
[Val = #binding_buckets{bucket_numbers = BNs}] ->
NewBNs = [N - Weight || N <- BNs],
ok = mnesia:write(?BINDING_BUCKET_TABLE, Val#binding_buckets{bucket_numbers = NewBNs}, write)
end
end || Q <- QueuesWithUpdatedBuckets],
ok = mnesia:delete(?BINDING_BUCKET_TABLE, {S, D}, write),
ok.
ok;
[] ->
rabbit_log:warning("Can't remove binding: hash ring state for exchange ~s wasn't found",
[rabbit_misc:rs(S)]),
ok
end.
assert_args_equivalence(X, Args) ->
rabbit_exchange:assert_args_equivalence(X, Args).
bucket_count_of(X) ->
case ets:lookup(?BUCKET_COUNT_TABLE, X) of
[] -> 0;
[#bucket_count{count = N}] -> N
end.
init() ->
mnesia:create_table(?BUCKET_TABLE, [{record_name, bucket},
{attributes, record_info(fields, bucket)},
{type, ordered_set}]),
mnesia:create_table(?BUCKET_COUNT_TABLE, [{record_name, bucket_count},
{attributes, record_info(fields, bucket_count)},
{type, ordered_set}]),
mnesia:create_table(?BINDING_BUCKET_TABLE, [{record_name, binding_buckets},
{attributes, record_info(fields, binding_buckets)},
{type, ordered_set}]),
mnesia:add_table_copy(?BUCKET_TABLE, node(), ram_copies),
mnesia:add_table_copy(?BUCKET_COUNT_TABLE, node(), ram_copies),
mnesia:add_table_copy(?BINDING_BUCKET_TABLE, node(), ram_copies),
mnesia:wait_for_tables([?BUCKET_TABLE], 30000),
mnesia:create_table(?HASH_RING_STATE_TABLE, [{record_name, chx_hash_ring},
{attributes, record_info(fields, chx_hash_ring)},
{type, ordered_set}]),
mnesia:add_table_copy(?HASH_RING_STATE_TABLE, node(), ram_copies),
mnesia:wait_for_tables([?HASH_RING_STATE_TABLE], 30000),
ok.
%%