Add routing on message properties

Summary:

Add a new hash-property argument setting that allows for message hasing based upon the correlation_id or message_id.

Changes:

- Validate the exchange upon creation to ensure that hash-header and hash-property are not both set at the same time. Additionally validate the value of hash-property when set is one of correlation_id or message_id
- Change the signature of hash/2 for header to match on {header, Header} instead of {longstr, Header}
- Add a new hash/2 implementation that matches on {property, Value} for returning the hashable string from the message properties
- Implement a new hash_on/1 method for selecting the data the message will be routed on
- Implement a new hash_args/1 method for returning the configuration for both hash-header and hash-property
- Add test coverage for message property based routing

This addresses the proposal I outlined in #7
This commit is contained in:
Gavin M. Roy 2015-09-02 12:33:32 -04:00
parent 63d9a783ff
commit a9bcb539b4
3 changed files with 154 additions and 21 deletions

View File

@ -6,11 +6,12 @@ This plugin adds a consistent-hash exchange type to RabbitMQ.
In various scenarios, you may wish to ensure that messages sent to an
exchange are consistently and equally distributed across a number of
different queues based on the routing key of the message (or a
nominated header, see "Routing on a header" below). You could arrange
for this to occur yourself by using a direct or topic exchange,
binding queues to that exchange and then publishing messages to that
exchange that match the various binding keys.
different queues based on the routing key of the message, a nominated
header (see "Routing on a header" below), or a message property (see
"Routing on a message property" below). You could arrange for this to
occur yourself by using a direct or topic exchange, binding queues
to that exchange and then publishing messages to that exchange that
match the various binding keys.
However, arranging things this way can be problematic:
@ -69,7 +70,7 @@ This plugin supports RabbitMQ 3.3.x and later versions.
Here is an example using the Erlang client:
``` erlang
```erlang
-include_lib("amqp_client/include/amqp_client.hrl").
test() ->
@ -137,16 +138,39 @@ exchange to route based on a named header instead. To do this, declare the
exchange with a string argument called "hash-header" naming the header to
be used. For example using the Erlang client as above:
```erlang
amqp_channel:call(
Chan, #'exchange.declare' {
exchange = <<"e">>,
type = <<"x-consistent-hash">>,
arguments = [{<<"hash-header">>, longstr, <<"hash-me">>}]
}).
```
If you specify "hash-header" and then publish messages without the named
header, they will all get routed to the same (arbitrarily-chosen) queue.
## Routing on a message property
In addition to a value in the header property, you can also route on the
``message_id``, ``correlation_id``, or ``timestamp`` message property. To do so,
declare the exchange with a string argument called "hash-property" naming the
property to be used. For example using the Erlang client as above:
```erlang
amqp_channel:call(
Chan, #'exchange.declare' {
exchange = <<"e">>,
type = <<"x-consistent-hash">>,
arguments = [{<<"hash-property">>, longstr, <<"message_id">>}]
}).
```
Note that you can not declare an exchange that routes on both "hash-header" and
"hash-property". If you specify "hash-property" and then publish messages without
a value in the named property, they will all get routed to the same
(arbitrarily-chosen) queue.
## Getting Help
Any comments or feedback welcome, to the

View File

@ -16,6 +16,7 @@
-module(rabbit_exchange_type_consistent_hash).
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-behaviour(rabbit_exchange_type).
@ -46,6 +47,7 @@
-define(TABLE, ?MODULE).
-define(PHASH2_RANGE, 134217728). %% 2^27
-define(PROPERTIES, [<<"correlation_id">>, <<"message_id">>, <<"timestamp">>]).
description() ->
[{description, <<"Consistent Hashing Exchange">>}].
@ -67,8 +69,7 @@ route(#exchange { name = Name,
%% end up as relatively deep data structures which cost a lot to
%% continually copy to the process heap. Consequently, such
%% approaches have not been found to be much faster, if at all.
HashOn = rabbit_misc:table_lookup(Args, <<"hash-header">>),
H = erlang:phash2(hash(HashOn, Msg), ?PHASH2_RANGE),
H = erlang:phash2(hash(hash_on(Args), Msg), ?PHASH2_RANGE),
case ets:select(?TABLE, [{#bucket { source_number = {Name, '$2'},
destination = '$1',
_ = '_' },
@ -84,7 +85,23 @@ route(#exchange { name = Name,
Destinations
end.
validate(_X) -> ok.
validate(#exchange { arguments = Args }) ->
case hash_args(Args) of
{undefined, undefined} -> ok;
{undefined, {_Type, Value}} ->
case lists:member(Value, ?PROPERTIES) of
true -> ok;
false ->
rabbit_misc:protocol_error(precondition_failed,
"Unsupported property: ~s",
[Value])
end;
{_, undefined} -> ok;
{_, _} ->
rabbit_misc:protocol_error(precondition_failed,
"hash-header and hash-property are mutually exclusive",
[])
end.
validate_binding(_X, #binding { key = K }) ->
try
@ -168,9 +185,43 @@ find_numbers(Source, N, Acc) ->
hash(undefined, #basic_message { routing_keys = Routes }) ->
Routes;
hash({longstr, Header}, #basic_message { content = Content }) ->
hash({header, Header}, #basic_message { content = Content }) ->
Headers = rabbit_basic:extract_headers(Content),
case Headers of
undefined -> undefined;
_ -> rabbit_misc:table_lookup(Headers, Header)
end;
hash({property, Property}, #basic_message { content = Content }) ->
#content{properties = #'P_basic'{ correlation_id = CorrId,
message_id = MsgId,
timestamp = Timestamp }} =
rabbit_binary_parser:ensure_content_decoded(Content),
case Property of
<<"correlation_id">> -> CorrId;
<<"message_id">> -> MsgId;
<<"timestamp">> ->
case Timestamp of
undefined -> undefined;
_ -> integer_to_binary(Timestamp)
end
end.
hash_args(Args) ->
Header =
case rabbit_misc:table_lookup(Args, <<"hash-header">>) of
undefined -> undefined;
{longstr, V1} -> {header, V1}
end,
Property =
case rabbit_misc:table_lookup(Args, <<"hash-property">>) of
undefined -> undefined;
{longstr, V2} -> {property, V2}
end,
{Header, Property}.
hash_on(Args) ->
case hash_args(Args) of
{undefined, undefined} -> undefined;
{Header, undefined} -> Header;
{undefined, Property} -> Property
end.

View File

@ -32,6 +32,11 @@ t(Qs) ->
ok = test_with_header(Qs),
ok = test_binding_with_negative_routing_key(),
ok = test_binding_with_non_numeric_routing_key(),
ok = test_with_correlation_id(Qs),
ok = test_with_message_id(Qs),
ok = test_with_timestamp(Qs),
ok = test_non_supported_property(),
ok = test_mutually_exclusive_arguments(),
ok.
test_with_rk(Qs) ->
@ -51,8 +56,61 @@ test_with_header(Qs) ->
#amqp_msg{props = #'P_basic'{headers = H}, payload = <<>>}
end, [{<<"hash-header">>, longstr, <<"hashme">>}], Qs).
test_with_correlation_id(Qs) ->
test0(fun() ->
#'basic.publish'{exchange = <<"e">>}
end,
fun() ->
#amqp_msg{props = #'P_basic'{correlation_id = rnd()}, payload = <<>>}
end, [{<<"hash-property">>, longstr, <<"correlation_id">>}], Qs).
test_with_message_id(Qs) ->
test0(fun() ->
#'basic.publish'{exchange = <<"e">>}
end,
fun() ->
#amqp_msg{props = #'P_basic'{message_id = rnd()}, payload = <<>>}
end, [{<<"hash-property">>, longstr, <<"message_id">>}], Qs).
test_with_timestamp(Qs) ->
test0(fun() ->
#'basic.publish'{exchange = <<"e">>}
end,
fun() ->
#amqp_msg{props = #'P_basic'{timestamp = rndint()}, payload = <<>>}
end, [{<<"hash-property">>, longstr, <<"timestamp">>}], Qs).
test_mutually_exclusive_arguments() ->
{ok, Conn} = amqp_connection:start(#amqp_params_network{}),
{ok, Chan} = amqp_connection:open_channel(Conn),
process_flag(trap_exit, true),
Cmd = #'exchange.declare'{
exchange = <<"fail">>,
type = <<"x-consistent-hash">>,
arguments = [{<<"hash-header">>, longstr, <<"foo">>},
{<<"hash-property">>, longstr, <<"bar">>}]
},
?assertExit(_, amqp_channel:call(Chan, Cmd)),
ok.
test_non_supported_property() ->
{ok, Conn} = amqp_connection:start(#amqp_params_network{}),
{ok, Chan} = amqp_connection:open_channel(Conn),
process_flag(trap_exit, true),
Cmd = #'exchange.declare'{
exchange = <<"fail">>,
type = <<"x-consistent-hash">>,
arguments = [{<<"hash-property">>, longstr, <<"app_id">>}]
},
?assertExit(_, amqp_channel:call(Chan, Cmd)),
ok.
rnd() ->
list_to_binary(integer_to_list(random:uniform(1000000))).
list_to_binary(integer_to_list(rndint())).
rndint() ->
random:uniform(1000000).
test0(MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues) ->
Count = 10000,
@ -69,16 +127,16 @@ test0(MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues) ->
}),
[#'queue.declare_ok'{} =
amqp_channel:call(Chan, #'queue.declare' {
queue = Q, exclusive = true}) || Q <- Queues],
queue = Q, exclusive = true }) || Q <- Queues],
[#'queue.bind_ok'{} =
amqp_channel:call(Chan, #'queue.bind' {queue = Q,
exchange = <<"e">>,
routing_key = <<"10">>})
exchange = <<"e">>,
routing_key = <<"10">>})
|| Q <- [Q1, Q2]],
[#'queue.bind_ok'{} =
amqp_channel:call(Chan, #'queue.bind' {queue = Q,
exchange = <<"e">>,
routing_key = <<"20">>})
exchange = <<"e">>,
routing_key = <<"20">>})
|| Q <- [Q3, Q4]],
#'tx.select_ok'{} = amqp_channel:call(Chan, #'tx.select'{}),
[amqp_channel:call(Chan,
@ -87,7 +145,7 @@ test0(MakeMethod, MakeMsg, DeclareArgs, [Q1, Q2, Q3, Q4] = Queues) ->
amqp_channel:call(Chan, #'tx.commit'{}),
Counts =
[begin
#'queue.declare_ok'{message_count = M} =
#'queue.declare_ok'{message_count = M} =
amqp_channel:call(Chan, #'queue.declare' {queue = Q,
exclusive = true}),
M
@ -104,14 +162,14 @@ test_binding_with_negative_routing_key() ->
{ok, Conn} = amqp_connection:start(#amqp_params_network{}),
{ok, Chan} = amqp_connection:open_channel(Conn),
Declare1 = #'exchange.declare'{exchange = <<"bind-fail">>,
type = <<"x-consistent-hash">>},
type = <<"x-consistent-hash">>},
#'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare1),
Q = <<"test-queue">>,
Declare2 = #'queue.declare'{queue = Q},
#'queue.declare_ok'{} = amqp_channel:call(Chan, Declare2),
process_flag(trap_exit, true),
Cmd = #'queue.bind'{exchange = <<"bind-fail">>,
routing_key = <<"-1">>},
routing_key = <<"-1">>},
?assertExit(_, amqp_channel:call(Chan, Cmd)),
{ok, Ch2} = amqp_connection:open_channel(Conn),
amqp_channel:call(Ch2, #'queue.delete'{queue = Q}),
@ -121,14 +179,14 @@ test_binding_with_non_numeric_routing_key() ->
{ok, Conn} = amqp_connection:start(#amqp_params_network{}),
{ok, Chan} = amqp_connection:open_channel(Conn),
Declare1 = #'exchange.declare'{exchange = <<"bind-fail">>,
type = <<"x-consistent-hash">>},
type = <<"x-consistent-hash">>},
#'exchange.declare_ok'{} = amqp_channel:call(Chan, Declare1),
Q = <<"test-queue">>,
Declare2 = #'queue.declare'{queue = Q},
#'queue.declare_ok'{} = amqp_channel:call(Chan, Declare2),
process_flag(trap_exit, true),
Cmd = #'queue.bind'{exchange = <<"bind-fail">>,
routing_key = <<"not-a-number">>},
routing_key = <<"not-a-number">>},
?assertExit(_, amqp_channel:call(Chan, Cmd)),
{ok, Ch2} = amqp_connection:open_channel(Conn),
amqp_channel:call(Ch2, #'queue.delete'{queue = Q}),