Merge pull request #41 from rabbitmq/rabbitmq-consistent-hash-exchange-40
Correctly determine bucket range boundaries when updating hash ring state
This commit is contained in:
commit
370fc599f5
|
|
@ -195,11 +195,11 @@ remove_binding(#binding{source = S, destination = D, key = RK}) ->
|
|||
case maps:size(BucketsOfThisBinding) of
|
||||
0 -> ok;
|
||||
N when N >= 1 ->
|
||||
KeysOfThisBinding = maps:keys(BucketsOfThisBinding),
|
||||
KeysOfThisBinding = lists:usort(maps:keys(BucketsOfThisBinding)),
|
||||
LastBucket = lists:last(KeysOfThisBinding),
|
||||
FirstBucket = hd(KeysOfThisBinding),
|
||||
BucketsDownTheRing = maps:filter(fun (K, _) -> K > LastBucket end, BM0),
|
||||
UnchangedBuckets = maps:filter(fun (K, _) -> K < FirstBucket end, BM0),
|
||||
UnchangedBuckets = maps:filter(fun (K, _) -> K < FirstBucket end, BM0),
|
||||
|
||||
%% final state with "down the ring" buckets updated
|
||||
NewBucketsDownTheRing = maps:fold(
|
||||
|
|
|
|||
|
|
@ -43,6 +43,11 @@ groups() ->
|
|||
test_hash_ring_updates_when_queue_is_deleted,
|
||||
test_hash_ring_updates_when_multiple_queues_are_deleted,
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure,
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case2,
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case3,
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case4,
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case5,
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case6,
|
||||
test_hash_ring_updates_when_exchange_is_deleted,
|
||||
test_hash_ring_updates_when_queue_is_unbound
|
||||
]}
|
||||
|
|
@ -401,6 +406,63 @@ test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closu
|
|||
clean_up_test_topology(Config, X, []),
|
||||
ok.
|
||||
|
||||
%% rabbitmq/rabbitmq-consistent-has-exchange#40, uses higher weights
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case2(Config) ->
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case(Config, ?FUNCTION_NAME, 50).
|
||||
|
||||
%% rabbitmq/rabbitmq-consistent-has-exchange#40
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case3(Config) ->
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case(Config, ?FUNCTION_NAME, 34).
|
||||
|
||||
%% rabbitmq/rabbitmq-consistent-has-exchange#40
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case4(Config) ->
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case(Config, ?FUNCTION_NAME, 100).
|
||||
|
||||
%% rabbitmq/rabbitmq-consistent-has-exchange#40
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case5(Config) ->
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case(Config, ?FUNCTION_NAME, 268).
|
||||
|
||||
%% rabbitmq/rabbitmq-consistent-has-exchange#40
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case6(Config) ->
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case(Config, ?FUNCTION_NAME, 1937).
|
||||
|
||||
test_hash_ring_updates_when_exclusive_queues_are_deleted_due_to_connection_closure_case(Config, XAsList, Key) ->
|
||||
Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config),
|
||||
{ok, Chan} = amqp_connection:open_channel(Conn),
|
||||
|
||||
X = atom_to_binary(XAsList, utf8),
|
||||
amqp_channel:call(Chan, #'exchange.delete' {exchange = X}),
|
||||
|
||||
Declare = #'exchange.declare'{exchange = X,
|
||||
type = <<"x-consistent-hash">>},
|
||||
#'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare),
|
||||
|
||||
NumQueues = 6,
|
||||
Queues = [begin
|
||||
#'queue.declare_ok'{queue = Q} =
|
||||
amqp_channel:call(Chan, #'queue.declare' {exclusive = true }),
|
||||
Q
|
||||
end || _ <- lists:seq(1, NumQueues)],
|
||||
[#'queue.bind_ok'{} =
|
||||
amqp_channel:call(Chan, #'queue.bind' {queue = Q,
|
||||
exchange = X,
|
||||
routing_key = integer_to_binary(Key)})
|
||||
|| Q <- Queues],
|
||||
|
||||
ct:pal("all hash ring rows: ~p", [hash_ring_rows(Config)]),
|
||||
|
||||
%% NumQueues x 'Key' buckets per binding
|
||||
?assertEqual(NumQueues * Key, count_buckets_of_exchange(Config, X)),
|
||||
assert_ring_consistency(Config, X),
|
||||
ok = amqp_connection:close(Conn),
|
||||
timer:sleep(1000),
|
||||
|
||||
ct:pal("all hash ring rows after connection closure (~p): ~p", [XAsList, hash_ring_rows(Config)]),
|
||||
|
||||
?assertEqual(0, count_buckets_of_exchange(Config, X)),
|
||||
clean_up_test_topology(Config, X, []),
|
||||
ok.
|
||||
|
||||
test_hash_ring_updates_when_exchange_is_deleted(Config) ->
|
||||
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
|
||||
|
||||
|
|
@ -485,7 +547,7 @@ hash_ring_rows(Config) ->
|
|||
|
||||
assert_ring_consistency(Config, X) ->
|
||||
[#chx_hash_ring{bucket_map = M}] = hash_ring_state(Config, X),
|
||||
Buckets = maps:keys(M),
|
||||
Buckets = lists:usort(maps:keys(M)),
|
||||
Hi = lists:last(Buckets),
|
||||
|
||||
%% bucket numbers form a sequence without gaps or duplicates
|
||||
|
|
|
|||
Loading…
Reference in New Issue