Merge pull request #13852 from rabbitmq/mergify/bp/v4.1.x/pr-13836
Trigger a 4.1.x alpha release build / trigger_alpha_build (push) Waiting to run Details
Test (make) / Build and Xref (1.17, 26) (push) Waiting to run Details
Test (make) / Build and Xref (1.17, 27) (push) Waiting to run Details
Test (make) / Test (1.17, 27, khepri) (push) Waiting to run Details
Test (make) / Test (1.17, 27, mnesia) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.17, 27, khepri) (push) Waiting to run Details
Test (make) / Test mixed clusters (1.17, 27, mnesia) (push) Waiting to run Details
Test (make) / Type check (1.17, 27) (push) Waiting to run Details

Management UI: new page and elements for superstreams (partitioned streams) (backport #13836)
This commit is contained in:
Michael Klishin 2025-05-05 21:28:25 +04:00 committed by GitHub
commit 963e2c670a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 282 additions and 6 deletions

View File

@ -406,7 +406,7 @@ stream_queue_arguments(ArgumentsAcc, Arguments)
stream_queue_arguments(ArgumentsAcc,
#{<<"max-length-bytes">> := Value} = Arguments) ->
stream_queue_arguments([{<<"x-max-length-bytes">>, long,
binary_to_integer(Value)}]
rabbit_data_coercion:to_integer(Value)}]
++ ArgumentsAcc,
maps:remove(<<"max-length-bytes">>, Arguments));
stream_queue_arguments(ArgumentsAcc,
@ -418,14 +418,14 @@ stream_queue_arguments(ArgumentsAcc,
#{<<"stream-max-segment-size-bytes">> := Value} =
Arguments) ->
stream_queue_arguments([{<<"x-stream-max-segment-size-bytes">>, long,
binary_to_integer(Value)}]
rabbit_data_coercion:to_integer(Value)}]
++ ArgumentsAcc,
maps:remove(<<"stream-max-segment-size-bytes">>,
Arguments));
stream_queue_arguments(ArgumentsAcc,
#{<<"initial-cluster-size">> := Value} = Arguments) ->
stream_queue_arguments([{<<"x-initial-cluster-size">>, long,
binary_to_integer(Value)}]
rabbit_data_coercion:to_integer(Value)}]
++ ArgumentsAcc,
maps:remove(<<"initial-cluster-size">>, Arguments));
stream_queue_arguments(ArgumentsAcc,
@ -437,7 +437,7 @@ stream_queue_arguments(ArgumentsAcc,
stream_queue_arguments(ArgumentsAcc,
#{<<"stream-filter-size-bytes">> := Value} = Arguments) ->
stream_queue_arguments([{<<"x-stream-filter-size-bytes">>, long,
binary_to_integer(Value)}]
rabbit_data_coercion:to_integer(Value)}]
++ ArgumentsAcc,
maps:remove(<<"stream-filter-size-bytes">>, Arguments));
stream_queue_arguments(ArgumentsAcc, _Arguments) ->

View File

@ -10,7 +10,15 @@ dispatcher_add(function(sammy) {
'consumers': '/stream/connections/' + vhost + '/' + name + '/consumers',
'publishers': '/stream/connections/' + vhost + '/' + name + '/publishers'},
'streamConnection', '#/stream/connections');
});
});
sammy.get('#/stream/super-streams', function() {
render({'vhosts': '/vhosts'}, 'superStreams', '#/stream/super-streams')
});
sammy.put('#/stream/super-streams', function() {
put_cast_params(this, '/stream/super-streams/:vhost/:name',
['name', 'pattern', 'policy'], ['priority'], []);
location.href = "/#/queues";
});
// not exactly dispatcher stuff, but we have to make sure this is called before
// HTTP requests are made in case of refresh of the queue page
QUEUE_EXTRA_CONTENT_REQUESTS.push(function(vhost, queue) {
@ -33,6 +41,7 @@ dispatcher_add(function(sammy) {
});
NAVIGATION['Stream Connections'] = ['#/stream/connections', "monitoring"];
NAVIGATION['Super Streams'] = ['#/stream/super-streams', "management"];
var ALL_STREAM_CONNECTION_COLUMNS =
{'Overview': [['user', 'User name', true],

View File

@ -0,0 +1,70 @@
<h2> Super Streams </h2>
<% if (ac.canAccessVhosts()) { %>
<div class="section">
<h2>Add a new super stream</h2>
<div class="hider">
<form action="#/stream/super-streams" method="put">
<table class="form">
<% if (display.vhosts) { %>
<tr>
<th><label>Virtual host:</label></th>
<td>
<select name="vhost">
<% for (var i = 0; i < vhosts.length; i++) { %>
<option value="<%= fmt_string(vhosts[i].name) %>" <%= (vhosts[i].name === current_vhost) ? 'selected="selected"' : '' %>><%= fmt_string(vhosts[i].name) %></option>
<% } %>
</select>
</td>
</tr>
<% } else { %>
<tr><td><input type="hidden" name="vhost" value="<%= fmt_string(vhosts[0].name) %>"/></td></tr>
<% } %>
<tr>
<th><label>Name:</label></th>
<td><input type="text" name="name"/><span class="mand">*</span></td>
</tr>
<tr>
<th>
<label>
<select name="has-partitions" class="narrow controls-appearance">
<option value="partitions" selected="selected">Partitions:</option>
<option value="binding-keys">Binding keys:</option>
</select>
</label>
</th>
<td>
<div id="partitions-div">
<input type="partitions" name="partitions" />
<span class="mand">*</span><br/>
</div>
<div id="binding-keys-div" style="display: none;">
<input type="binding-keys" name="binding-keys" />
<span class="mand">*</span><br/>
</div>
</td>
</tr>
<tr>
<th><label>Arguments:</label></th>
<td>
<div class="multifield" id="arguments"></div>
<table class="argument-links">
<tr>
<td>Add</td>
<td>
<span class="argument-link" field="arguments" key="max-length-bytes" type="number">Max length bytes</span> <span class="help" id="queue-max-length-bytes"></span>
| <span class="argument-link" field="arguments" key="max-age" type="string">Max time retention</span><span class="help" id="queue-max-age"></span>
| <span class="argument-link" field="arguments" key="stream-max-segment-size-bytes" type="number">Max segment size in bytes</span><span class="help" id="queue-stream-max-segment-size-bytes"></span></br>
| <span class="argument-link" field="arguments" key="initial-cluster-size" type="number">Initial cluster size</span><span class="help" id="queue-initial-cluster-size"></span>
| <span class="argument-link" field="arguments" key="queue-leader-locator" type="string">Leader locator</span><span class="help" id="queue-leader-locator"></span>
</td>
</tr>
</table>
</td>
</tr>
</table>
<input type="submit" value="Add super stream"/>
</form>
</div>
</div>
<% } %>

View File

@ -0,0 +1,165 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_stream_super_stream_mgmt).
-behaviour(rabbit_mgmt_extension).
-export([dispatcher/0,
web_ui/0]).
-export([init/2,
content_types_accepted/2,
is_authorized/2,
resource_exists/2,
allowed_methods/2,
accept_content/2]).
-export([variances/2]).
-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-define(DEFAULT_RPC_TIMEOUT, 30_000).
dispatcher() ->
[{"/stream/super-streams/:vhost/:name", ?MODULE, []}].
web_ui() ->
[].
%%--------------------------------------------------------------------
init(Req, _State) ->
{cowboy_rest,
rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE),
#context{}}.
variances(Req, Context) ->
{[<<"accept-encoding">>, <<"origin">>], Req, Context}.
content_types_accepted(ReqData, Context) ->
{[{{<<"application">>, <<"json">>, '*'}, accept_content}], ReqData, Context}.
allowed_methods(ReqData, Context) ->
{[<<"PUT">>, <<"OPTIONS">>], ReqData, Context}.
resource_exists(ReqData, Context) ->
%% just checking that the vhost requested exists
{case rabbit_mgmt_util:all_or_one_vhost(ReqData, fun (_) -> [] end) of
vhost_not_found -> false;
_ -> true
end, ReqData, Context}.
is_authorized(ReqData, Context) ->
rabbit_mgmt_util:is_authorized_vhost(ReqData, Context).
accept_content(ReqData0, #context{user = #user{username = ActingUser}} = Context) ->
%% TODO validate arguments?
VHost = rabbit_mgmt_util:id(vhost, ReqData0),
Name = rabbit_mgmt_util:id(name, ReqData0),
rabbit_mgmt_util:with_decode(
[], ReqData0, Context,
fun([], BodyMap, ReqData) ->
PartitionsBin = maps:get(partitions, BodyMap, undefined),
BindingKeysStr = maps:get('binding-keys', BodyMap, undefined),
case validate_partitions_or_binding_keys(PartitionsBin, BindingKeysStr, ReqData, Context) of
ok ->
Arguments = maps:get(arguments, BodyMap, #{}),
Node = get_node(BodyMap),
case PartitionsBin of
undefined ->
BindingKeys = binding_keys(BindingKeysStr),
Streams = streams_from_binding_keys(Name, BindingKeys),
create_super_stream(Node, VHost, Name, Streams,
Arguments, BindingKeys, ActingUser,
ReqData, Context);
_ ->
case validate_partitions(PartitionsBin, ReqData, Context) of
Partitions when is_integer(Partitions) ->
Streams = streams_from_partitions(Name, Partitions),
RoutingKeys = routing_keys(Partitions),
create_super_stream(Node, VHost, Name, Streams,
Arguments, RoutingKeys, ActingUser,
ReqData, Context);
Error ->
Error
end
end;
Error ->
Error
end
end).
%%-------------------------------------------------------------------
get_node(Props) ->
case maps:get(<<"node">>, Props, undefined) of
undefined -> node();
N -> rabbit_nodes:make(
binary_to_list(N))
end.
binding_keys(BindingKeysStr) ->
[rabbit_data_coercion:to_binary(
string:strip(K))
|| K
<- string:tokens(
rabbit_data_coercion:to_list(BindingKeysStr), ",")].
routing_keys(Partitions) ->
[integer_to_binary(K) || K <- lists:seq(0, Partitions - 1)].
streams_from_binding_keys(Name, BindingKeys) ->
[list_to_binary(binary_to_list(Name)
++ "-"
++ binary_to_list(K))
|| K <- BindingKeys].
streams_from_partitions(Name, Partitions) ->
[list_to_binary(binary_to_list(Name)
++ "-"
++ integer_to_list(K))
|| K <- lists:seq(0, Partitions - 1)].
create_super_stream(NodeName, VHost, SuperStream, Streams, Arguments,
RoutingKeys, ActingUser, ReqData, Context) ->
case rabbit_misc:rpc_call(NodeName,
rabbit_stream_manager,
create_super_stream,
[VHost,
SuperStream,
Streams,
Arguments,
RoutingKeys,
ActingUser],
?DEFAULT_RPC_TIMEOUT) of
ok ->
{true, ReqData, Context};
{error, Reason} ->
rabbit_mgmt_util:bad_request(io_lib:format("~p", [Reason]),
ReqData, Context)
end.
validate_partitions_or_binding_keys(undefined, undefined, ReqData, Context) ->
rabbit_mgmt_util:bad_request("Must specify partitions or binding keys", ReqData, Context);
validate_partitions_or_binding_keys(_, undefined, _, _) ->
ok;
validate_partitions_or_binding_keys(undefined, _, _, _) ->
ok;
validate_partitions_or_binding_keys(_, _, ReqData, Context) ->
rabbit_mgmt_util:bad_request("Specify partitions or binding keys, not both", ReqData, Context).
validate_partitions(PartitionsBin, ReqData, Context) ->
try
case rabbit_data_coercion:to_integer(PartitionsBin) of
Int when Int < 1 ->
rabbit_mgmt_util:bad_request("The partition number must be greater than 0", ReqData, Context);
Int ->
Int
end
catch
_:_ ->
rabbit_mgmt_util:bad_request("The partitions must be a number", ReqData, Context)
end.

View File

@ -10,13 +10,20 @@
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_mgmt_test.hrl").
-import(rabbit_mgmt_test_util, [
http_put/4
]).
-compile(export_all).
all() ->
[{group, non_parallel_tests}].
groups() ->
[{non_parallel_tests, [], [stream_management]}].
[{non_parallel_tests, [], [
stream_management,
create_super_stream
]}].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
@ -27,6 +34,7 @@ init_per_suite(Config) ->
true ->
{skip, "suite is not mixed versions compatible"};
_ ->
inets:start(),
rabbit_ct_helpers:log_environment(),
Config1 =
rabbit_ct_helpers:set_config(Config,
@ -108,6 +116,30 @@ stream_management(Config) ->
{"MANAGEMENT_PORT=~b", [ManagementPortNode]}]),
{ok, _} = MakeResult.
create_super_stream(Config) ->
http_put(Config, "/stream/super-streams/%2F/carrots", #{partitions => 3,
'binding-keys' => "streamA"},
?BAD_REQUEST),
http_put(Config, "/stream/super-streams/%2F/carrots", #{partitions => "this is not a partition"},
?BAD_REQUEST),
http_put(Config, "/stream/super-streams/%2F/carrots", #{partitions => 3},
{group, '2xx'}),
http_put(Config, "/stream/super-streams/%2F/cucumber", #{'binding-keys' => "fresh-cucumber"},
{group, '2xx'}),
http_put(Config, "/stream/super-streams/%2F/aubergine",
#{partitions => 3,
arguments => #{'max-length-bytes' => 1000000,
'max-age' => <<"1h">>,
'stream-max-segment-size' => 500,
'initial-cluster-size' => 2,
'queue-leader-locator' => <<"client-local">>}},
{group, '2xx'}),
http_put(Config, "/stream/super-streams/%2F/watermelon",
#{partitions => 3,
arguments => #{'queue-leader-locator' => <<"remote">>}},
?BAD_REQUEST),
ok.
get_stream_port(Config) ->
rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_stream).