Add queue type setting for exchange federation

Remove x-ha-policy (ignored since RabbitMQ 3.0)
This commit is contained in:
Michal Kuratczyk 2024-02-29 12:31:40 +01:00
parent 15688c3f2d
commit 2e8ff3aaee
No known key found for this signature in database
8 changed files with 67 additions and 16 deletions

View File

@ -16,7 +16,7 @@
message_ttl,
trust_user_id,
ack_mode,
ha_policy,
queue_type,
name,
bind_nowait,
resource_cleanup_mode,
@ -45,4 +45,4 @@
-define(FEDERATION_GUIDE_URL, <<"https://rabbitmq.com/federation.html">>).
-define(FEDERATION_PG_SCOPE, rabbitmq_federation_pg_scope).
-define(FEDERATION_PG_SCOPE, rabbitmq_federation_pg_scope).

View File

@ -504,15 +504,16 @@ consume_from_upstream_queue(
#upstream{prefetch_count = Prefetch,
expires = Expiry,
message_ttl = TTL,
ha_policy = HA} = Upstream,
queue_type = QueueType} = Upstream,
#upstream_params{x_or_q = X,
params = Params} = UParams,
Q = upstream_queue_name(name(X), vhost(Params), DownXName),
Args = [A || {_K, _T, V} = A
<- [{<<"x-expires">>, long, Expiry},
{<<"x-message-ttl">>, long, TTL},
{<<"x-ha-policy">>, longstr, HA},
{<<"x-internal-purpose">>, longstr, <<"federation">>}],
{<<"x-internal-purpose">>, longstr, <<"federation">>},
{<<"x-queue-type">>, longstr, atom_to_binary(QueueType)}
],
V =/= none],
amqp_channel:call(Ch, #'queue.declare'{queue = Q,
durable = true,

View File

@ -89,7 +89,8 @@ shared_validation() ->
['no-ack', 'on-publish', 'on-confirm']), optional},
{<<"resource-cleanup-mode">>, rabbit_parameter_validation:enum(
['default', 'never']), optional},
{<<"ha-policy">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"queue-type">>, rabbit_parameter_validation:enum(
['classic', 'quorum']), optional},
{<<"bind-nowait">>, fun rabbit_parameter_validation:boolean/2, optional},
{<<"channel-use-mode">>, rabbit_parameter_validation:enum(
['multiple', 'single']), optional}].

View File

@ -136,7 +136,7 @@ from_upstream_or_set(US, Name, U, XorQ) ->
message_ttl = bget('message-ttl', US, U, none),
trust_user_id = bget('trust-user-id', US, U, false),
ack_mode = to_atom(bget('ack-mode', US, U, <<"on-confirm">>)),
ha_policy = bget('ha-policy', US, U, none),
queue_type = to_atom(bget('queue-type', US, U, <<"classic">>)),
name = Name,
bind_nowait = bget('bind-nowait', US, U, false),
resource_cleanup_mode = to_atom(bget('resource-cleanup-mode', US, U, <<"default">>)),

View File

@ -50,6 +50,7 @@ groups() ->
essential() ->
[
single_upstream,
single_upstream_quorum,
multiple_upstreams,
multiple_upstreams_pattern,
single_upstream_multiple_uris,
@ -163,9 +164,46 @@ single_upstream(Config) ->
await_binding(Config, 0, UpX, RK),
publish_expect(Ch, UpX, RK, Q, <<"single_upstream payload">>),
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
assert_federation_internal_queue_type(Config, Server, rabbit_classic_queue),
rabbit_ct_client_helpers:close_channel(Ch),
clean_up_federation_related_bits(Config).
single_upstream_quorum(Config) ->
FedX = <<"single_upstream_quorum.federated">>,
UpX = <<"single_upstream_quorum.upstream.x">>,
rabbit_ct_broker_helpers:set_parameter(
Config, 0, <<"federation-upstream">>, <<"localhost">>,
[
{<<"uri">>, rabbit_ct_broker_helpers:node_uri(Config, 0)},
{<<"exchange">>, UpX},
{<<"queue-type">>, <<"quorum">>}
]),
rabbit_ct_broker_helpers:set_policy(
Config, 0,
<<"fed.x">>, <<"^single_upstream_quorum.federated">>, <<"exchanges">>,
[
{<<"federation-upstream">>, <<"localhost">>}
]),
Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
Xs = [
exchange_declare_method(FedX)
],
declare_exchanges(Ch, Xs),
RK = <<"key">>,
Q = declare_and_bind_queue(Ch, FedX, RK),
await_binding(Config, 0, UpX, RK),
publish_expect(Ch, UpX, RK, Q, <<"single_upstream_quorum payload">>),
Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
assert_federation_internal_queue_type(Config, Server, rabbit_quorum_queue),
rabbit_ct_client_helpers:close_channel(Ch),
clean_up_federation_related_bits(Config).
multiple_upstreams(Config) ->
FedX = <<"multiple_upstreams.federated">>,
@ -870,3 +908,14 @@ await_credentials_obfuscation_seeding_on_two_nodes(Config) ->
end),
timer:sleep(1000).
assert_federation_internal_queue_type(Config, Server, Expected) ->
Qs = all_queues_on(Config, Server),
FedQs = lists:filter(
fun(Q) ->
lists:member(
{<<"x-internal-purpose">>, longstr, <<"federation">>}, amqqueue:get_arguments(Q))
end,
Qs),
FedQTypes = lists:map(fun(Q) -> amqqueue:get_type(Q) end, FedQs),
?assertEqual([Expected], lists:uniq(FedQTypes)).

View File

@ -75,8 +75,8 @@ HELP['federation-expires'] =
HELP['federation-ttl'] =
'Time in milliseconds that undelivered messages should be held upstream when there is a network outage or backlog. Leave this blank to mean "forever".';
HELP['ha-policy'] =
'Determines the "x-ha-policy" argument for the upstream queue for a federated exchange. Default is "none", meaning the queue is not HA.';
HELP['queue-type'] =
'Defines the queue type for the upstream queue for a federated exchange. Default is "classic". Set to "quorum" for high availability.';
HELP['queue'] =
'The name of the upstream queue. Default is to use the same name as the federated queue.';

View File

@ -56,8 +56,8 @@
</tr>
<tr>
<th>HA Policy</th>
<td><%= fmt_string(upstream.value['ha-policy']) %></td>
<th>Queue Type</th>
<td><%= fmt_string(upstream.value['queue-type']) %></td>
</tr>
<tr>

View File

@ -19,7 +19,7 @@
<th>Max Hops</th>
<th>Expiry</th>
<th>Message TTL</th>
<th>HA Policy</th>
<th>Queue Type</th>
<th>Queue</th>
<th>Consumer tag</th>
</tr>
@ -43,7 +43,7 @@
<td class="r"><%= upstream.value['max-hops'] %></td>
<td class="r"><%= fmt_time(upstream.value.expires, 'ms') %></td>
<td class="r"><%= fmt_time(upstream.value['message-ttl'], 'ms') %></td>
<td class="r"><%= fmt_string(upstream.value['ha-policy']) %></td>
<td class="r"><%= fmt_string(upstream.value['queue-type']) %></td>
<td class="r"><%= fmt_string(upstream.value['queue']) %></td>
<td class="r"><%= fmt_string(upstream.value['consumer-tag']) %></td>
</tr>
@ -195,11 +195,11 @@
<tr>
<th>
<label>
HA Policy:
<span class="help" id="ha-policy"></span>
Queue Type:
<span class="help" id="queue-type"></span>
</label>
</th>
<td><input type="text" name="ha-policy"/></td>
<td><input type="text" name="queue-type"/></td>
</tr>
</tr>