Shovel management: add local shovels

This commit is contained in:
Diana Parra Corbacho 2025-07-30 15:02:45 +02:00
parent bb5e1d95d9
commit 8c9f79fb36
3 changed files with 140 additions and 3 deletions

View File

@ -42,17 +42,27 @@ dispatcher_add(function(sammy) {
//remove fields not required by the selected protocol
if (this.params['src-protocol'] == 'amqp10') {
remove_params_with(this, 'amqp091-src');
remove_params_with(this, 'local-src');
} else if (this.params['src-protocol'] == 'amqp091') {
remove_params_with(this, 'amqp10-src');
remove_params_with(this, 'local-src');
} else {
remove_params_with(this, 'amqp10-src');
remove_params_with(this, 'amqp091-src');
}
if (this.params['dest-protocol'] == 'amqp10') {
remove_params_with(this, 'amqp091-dest');
remove_params_with(this, 'local-dest');
} else if (this.params['dest-protocol'] == 'amqp091'){
remove_params_with(this, 'amqp10-dest');
remove_params_with(this, 'local-dest');
} else {
remove_params_with(this, 'amqp091-dest');
remove_params_with(this, 'amqp10-dest');
}
var trimProtoPrefix = function (x) {
if(x.startsWith('amqp10-') || x.startsWith('amqp091-')) {
if(x.startsWith('amqp10-') || x.startsWith('amqp091-') || x.startsWith('local-')) {
return x.substr(x.indexOf('-') + 1, x.length);
}
return x;

View File

@ -95,6 +95,7 @@
<select name="src-protocol-selector" class="controls-appearance">
<option value="amqp091-src">AMQP 0.9.1</option>
<option value="amqp10-src">AMQP 1.0</option>
<option value="local-src">Local</option>
</select>
<div id="amqp10-src-div" style="display: none;">
<table class="subform">
@ -206,6 +207,55 @@
</tr>
</table>
</div>
<div id="local-src-div" style="display: none;">
<table class="subform">
<tr>
<td>
<label>
URI:
<span class="help" id="shovel-uri"></span>
</label>
</td>
<td>
<select name="queue-or-exchange" class="controls-appearance">
<option value="local-src-queue">Queue:</option>
<option value="local-src-exchange">Exchange:</option>
</select>
<span class="help" id="shovel-queue-exchange"></span>
</td>
</tr>
<tr>
<td>
<input type="text" name="local-src-uri" value="amqp://"/>
<span class="mand">*</span>
</td>
<td>
<div id="local-src-queue-div">
<input type="text" name="local-src-queue"/>
</div>
<div id="local-src-exchange-div" style="display: none;">
<input type="text" name="local-src-exchange"/>
Routing key: <input type="text" name="local-src-exchange-key"/>
</div>
</td>
</tr>
<tr>
<td>
<label>
Auto-delete
<span class="help" id="shovel-local-auto-delete"></span>
</label>
</td>
<td>
<select name="local-src-delete-after">
<option value="never">Never</option>
<option value="queue-length">After initial length transferred</option>
<option value="number">After num messages</option>
</select>
</td>
</tr>
</table>
</div>
</td>
</tr>
<tr>
@ -214,6 +264,7 @@
<select name="dest-protocol-selector" class="controls-appearance">
<option value="amqp091-dest">AMQP 0.9.1</option>
<option value="amqp10-dest">AMQP 1.0</option>
<option value="local-dest">Local</option>
</select>
<div id="amqp10-dest-div" style="display: none;">
<table class="subform">
@ -279,7 +330,7 @@
</div>
<div id="dest-exchange-div" style="display: none;">
<input type="text" name="amqp091-dest-exchange"/>
Routing key: <input type="text" name="dest-exchange-key"/>
Routing key: <input type="text" name="amqp091-dest-exchange-key"/>
</div>
</td>
</tr>
@ -299,6 +350,51 @@
</tr>
</table>
</div>
<div id="local-dest-div" style="display: none;">
<table class="subform">
<tr>
<td>
<label>
URI
<span class="help" id="shovel-uri"></span>
</label>
</td>
<td>
<select name="queue-or-exchange" class="narrow controls-appearance">
<option value="local-dest-queue">Queue:</option>
<option value="local-dest-exchange">Exchange:</option>
</select>
<span class="help" id="shovel-queue-exchange"></span>
</td>
</tr>
<tr>
<td><input type="text" name="local-dest-uri" value="amqp://"/><span class="mand">*</span></td>
<td>
<div id="local-dest-queue-div">
<input type="text" name="local-dest-queue"/>
</div>
<div id="local-dest-exchange-div" style="display: none;">
<input type="text" name="local-dest-exchange"/>
Routing key: <input type="text" name="local-dest-exchange-key"/>
</div>
</td>
</tr>
<tr>
<td>
<label>
Add forwarding headers:
<span class="help" id="shovel-forward-headers"></span>
</label>
</td>
<td>
<select name="local-dest-add-forward-headers">
<option value="false">No</option>
<option value="true">Yes</option>
</select>
</td>
</tr>
</table>
</div>
</td>
</tr>
<tr>

View File

@ -32,6 +32,7 @@ groups() ->
start_and_get_a_dynamic_amqp091_shovel_with_publish_properties,
start_and_get_a_dynamic_amqp091_shovel_with_missing_publish_properties,
start_and_get_a_dynamic_amqp091_shovel_with_empty_publish_properties,
start_and_get_a_dynamic_local_shovel,
create_and_delete_a_dynamic_shovel_that_successfully_connects,
create_and_delete_a_dynamic_shovel_that_fails_to_connect
]},
@ -212,6 +213,20 @@ start_and_get_a_dynamic_amqp091_shovel_with_empty_publish_properties(Config) ->
ok.
start_and_get_a_dynamic_local_shovel(Config) ->
remove_all_dynamic_shovels(Config, <<"/">>),
Name = rabbit_data_coercion:to_binary(?FUNCTION_NAME),
ID = {<<"/">>, Name},
await_shovel_removed(Config, ID),
declare_local_shovel(Config, Name),
await_shovel_startup(Config, ID),
Sh = get_shovel(Config, Name),
?assertEqual(Name, maps:get(name, Sh)),
delete_shovel(Config, Name),
ok.
start_static_shovels(Config) ->
http_put(Config, "/users/admin",
#{password => <<"admin">>, tags => <<"administrator">>}, ?CREATED),
@ -455,6 +470,22 @@ declare_amqp091_shovel_with_publish_properties(Config, Name, Props) ->
}
}, ?CREATED).
declare_local_shovel(Config, Name) ->
Port = integer_to_binary(
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp)),
http_put(Config, io_lib:format("/parameters/shovel/%2f/~ts", [Name]),
#{
value => #{
<<"src-protocol">> => <<"local">>,
<<"src-uri">> => <<"amqp://localhost:", Port/binary>>,
<<"src-queue">> => <<"local.src.test">>,
<<"src-delete-after">> => <<"never">>,
<<"dest-protocol">> => <<"local">>,
<<"dest-uri">> => <<"amqp://localhost:", Port/binary>>,
<<"dest-queue">> => <<"local.dest.test">>
}
}, ?CREATED).
await_shovel_startup(Config, Name) ->
await_shovel_startup(Config, Name, 10_000).
@ -480,4 +511,4 @@ does_shovel_exist(Config, Name) ->
case lookup_shovel_status(Config, Name) of
not_found -> false;
_Found -> true
end.
end.