Fix crash in consistent hash exchange

Prior to this commit, a crash occurred when a consistent hash exchange
got declared with a `hash-header` argument, but the publishing client
didn't set that header on the message.

This bug is present in RabbitMQ 3.13.0 - 3.13.6.

Fixes https://github.com/rabbitmq/rabbitmq-server/discussions/11671
This commit is contained in:
David Ansari 2024-07-24 11:42:59 +02:00
parent c31aae59d1
commit cdc5b886f8
3 changed files with 71 additions and 17 deletions

View File

@ -84,7 +84,7 @@ ring partitions, and thus queues according to their binding weights.
#### One Binding Per Queue
This exchange type **assumes a single binding between a queue and an exchange**.
Starting with RabbitMQ `3.10.6` and `3.9.21` this will be enforced in the code:
This will be enforced in the code:
when multiple bindings are created, only the first one will actually update the ring.
This limitation makes most semantic sense: the purpose is to achieve
@ -376,7 +376,7 @@ exchange to route based on a named header instead. To do this, declare the
exchange with a string argument called "hash-header" naming the header to
be used.
When a `"hash-header"` is specified, the chosen header **must be provided**.
When a `"hash-header"` is specified, the chosen header should be provided.
If published messages do not contain the header, they will all get
routed to the same **arbitrarily chosen** queue.
@ -579,7 +579,7 @@ declare the exchange with a string argument called ``"hash-property"`` naming th
property to be used.
The `"hash-header"` and `"hash-property"` are mutually exclusive.
When a `"hash-property"` is specified, the chosen property **must be provided**.
When a `"hash-property"` is specified, the chosen property should be provided.
If published messages do not contain the property, they will all get
routed to the same **arbitrarily chosen** queue.

View File

@ -261,8 +261,9 @@ jump_consistent_hash_value(_B0, J0, NumberOfBuckets, SeedState0) ->
value_to_hash(undefined, Msg) ->
mc:routing_keys(Msg);
value_to_hash({header, Header}, Msg0) ->
maps:get(Header, mc:routing_headers(Msg0, [x_headers]));
value_to_hash({header, Header}, Msg) ->
Headers = mc:routing_headers(Msg, [x_headers]),
maps:get(Header, Headers, undefined);
value_to_hash({property, Property}, Msg) ->
case Property of
<<"correlation_id">> ->

View File

@ -40,6 +40,7 @@ routing_tests() ->
[
routing_key_hashing_test,
custom_header_hashing_test,
custom_header_undefined,
message_id_hashing_test,
correlation_id_hashing_test,
timestamp_hashing_test,
@ -121,7 +122,7 @@ end_per_testcase(Testcase, Config) ->
%% N.B. lowering this value below 100K increases the probability
%% of failing the Chi squared test in some environments
-define(DEFAULT_SAMPLE_COUNT, 150000).
-define(DEFAULT_SAMPLE_COUNT, 150_000).
routing_key_hashing_test(Config) ->
ok = test_with_rk(Config, ?RoutingTestQs).
@ -145,6 +146,43 @@ other_routing_test(Config) ->
ok = test_mutually_exclusive_arguments(Config),
ok.
%% Test case for
%% https://github.com/rabbitmq/rabbitmq-server/discussions/11671
%% According to our docs, it's allowed (although not recommended)
%% for the publishing client to omit the header:
%% "If published messages do not contain the header,
%% they will all get routed to the same arbitrarily chosen queue."
custom_header_undefined(Config) ->
Exchange = <<"my exchange">>,
Queue = <<"my queue">>,
Ch = rabbit_ct_client_helpers:open_channel(Config),
#'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}),
#'exchange.declare_ok'{} = amqp_channel:call(
Ch, #'exchange.declare' {
exchange = Exchange,
type = <<"x-consistent-hash">>,
arguments = [{<<"hash-header">>, longstr, <<"hashme">>}]
}),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = Queue}),
#'queue.bind_ok'{} = amqp_channel:call(
Ch, #'queue.bind'{queue = Queue,
exchange = Exchange,
routing_key = <<"1">>}),
amqp_channel:call(Ch,
#'basic.publish'{exchange = Exchange},
%% We leave the "hashme" header undefined.
#amqp_msg{}),
amqp_channel:wait_for_confirms(Ch, 10),
?assertMatch({#'basic.get_ok'{}, #amqp_msg{}},
amqp_channel:call(Ch, #'basic.get'{queue = Queue})),
rabbit_ct_client_helpers:close_channel(Ch),
clean_up_test_topology(Config, Exchange, [Queue]),
ok.
%% Test that messages originally published with AMQP to a quorum queue
%% can be dead lettered via the consistent hash exchange to a stream.
amqp_dead_letter(Config) ->
@ -280,45 +318,60 @@ wait_for_accepts(N) ->
%% -------------------------------------------------------------------
test_with_rk(Config, Qs) ->
test0(Config, fun (E) ->
test0(Config,
fun (E) ->
#'basic.publish'{exchange = E, routing_key = rnd()}
end,
fun() ->
#amqp_msg{props = #'P_basic'{}, payload = <<>>}
end, [], Qs).
end,
[],
Qs).
test_with_header(Config, Qs) ->
test0(Config, fun (E) ->
test0(Config,
fun (E) ->
#'basic.publish'{exchange = E}
end,
fun() ->
H = [{<<"hashme">>, longstr, rnd()}],
#amqp_msg{props = #'P_basic'{headers = H}, payload = <<>>}
end, [{<<"hash-header">>, longstr, <<"hashme">>}], Qs).
end,
[{<<"hash-header">>, longstr, <<"hashme">>}],
Qs).
test_with_correlation_id(Config, Qs) ->
test0(Config, fun(E) ->
test0(Config,
fun(E) ->
#'basic.publish'{exchange = E}
end,
fun() ->
#amqp_msg{props = #'P_basic'{correlation_id = rnd()}, payload = <<>>}
end, [{<<"hash-property">>, longstr, <<"correlation_id">>}], Qs).
end,
[{<<"hash-property">>, longstr, <<"correlation_id">>}],
Qs).
test_with_message_id(Config, Qs) ->
test0(Config, fun(E) ->
test0(Config,
fun(E) ->
#'basic.publish'{exchange = E}
end,
fun() ->
#amqp_msg{props = #'P_basic'{message_id = rnd()}, payload = <<>>}
end, [{<<"hash-property">>, longstr, <<"message_id">>}], Qs).
end,
[{<<"hash-property">>, longstr, <<"message_id">>}],
Qs).
test_with_timestamp(Config, Qs) ->
test0(Config, fun(E) ->
test0(Config,
fun(E) ->
#'basic.publish'{exchange = E}
end,
fun() ->
#amqp_msg{props = #'P_basic'{timestamp = rnd_int()}, payload = <<>>}
end, [{<<"hash-property">>, longstr, <<"timestamp">>}], Qs).
end,
[{<<"hash-property">>, longstr, <<"timestamp">>}],
Qs).
test_mutually_exclusive_arguments(Config) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
@ -359,7 +412,7 @@ test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues) ->
test0(Config, MakeMethod, MakeMsg, DeclareArgs, Queues, ?DEFAULT_SAMPLE_COUNT).
test0(Config, MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues, IterationCount) ->
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
Chan = rabbit_ct_client_helpers:open_channel(Config),
#'confirm.select_ok'{} = amqp_channel:call(Chan, #'confirm.select'{}),
CHX = <<"e">>,