Merge branch 'main' into issue-9437-queue-storage-version
This commit is contained in:
		
						commit
						8e7e8f9127
					
				|  | @ -941,6 +941,11 @@ rabbitmq_integration_suite( | |||
|     size = "medium", | ||||
| ) | ||||
| 
 | ||||
| rabbitmq_suite( | ||||
|     name = "unit_quorum_queue_SUITE", | ||||
|     size = "medium", | ||||
| ) | ||||
| 
 | ||||
| rabbitmq_integration_suite( | ||||
|     name = "unit_app_management_SUITE", | ||||
|     size = "medium", | ||||
|  |  | |||
|  | @ -1692,6 +1692,14 @@ def test_suite_beam_files(name = "test_suite_beam_files"): | |||
|         erlc_opts = "//:test_erlc_opts", | ||||
|         deps = ["//deps/amqp_client:erlang_app"], | ||||
|     ) | ||||
|     erlang_bytecode( | ||||
|         name = "unit_quorum_queue_SUITE_beam_files", | ||||
|         testonly = True, | ||||
|         srcs = ["test/unit_quorum_queue_SUITE.erl"], | ||||
|         outs = ["test/unit_quorum_queue_SUITE.beam"], | ||||
|         app_name = "rabbit", | ||||
|         erlc_opts = "//:test_erlc_opts", | ||||
|     ) | ||||
|     erlang_bytecode( | ||||
|         name = "unit_app_management_SUITE_beam_files", | ||||
|         testonly = True, | ||||
|  |  | |||
|  | @ -22,7 +22,9 @@ | |||
|          add_user_sans_validation/3, put_user/2, put_user/3, | ||||
|          update_user/5, | ||||
|          update_user_with_hash/5, | ||||
|          add_user_sans_validation/6]). | ||||
|          add_user_sans_validation/6, | ||||
|          add_user_with_pre_hashed_password_sans_validation/3 | ||||
| ]). | ||||
| 
 | ||||
| -export([set_user_limits/3, clear_user_limits/3, is_over_connection_limit/1, | ||||
|          is_over_channel_limit/1, get_user_limits/0, get_user_limits/1]). | ||||
|  | @ -222,6 +224,10 @@ add_user(Username, Password, ActingUser, Limits, Tags) -> | |||
|     validate_and_alternate_credentials(Username, Password, ActingUser, | ||||
|                                        add_user_sans_validation(Limits, Tags)). | ||||
| 
 | ||||
| add_user_with_pre_hashed_password_sans_validation(Username, PasswordHash, ActingUser) -> | ||||
|     HashingAlgorithm = rabbit_password:hashing_mod(), | ||||
|     add_user_sans_validation(Username, PasswordHash, HashingAlgorithm, [], undefined, ActingUser). | ||||
| 
 | ||||
| add_user_sans_validation(Username, Password, ActingUser) -> | ||||
|     add_user_sans_validation(Username, Password, ActingUser, undefined, []). | ||||
| 
 | ||||
|  | @ -246,14 +252,12 @@ add_user_sans_validation(Username, Password, ActingUser, Limits, Tags) -> | |||
|            end, | ||||
|     add_user_sans_validation_in(Username, User, ConvertedTags, Limits, ActingUser). | ||||
| 
 | ||||
| add_user_sans_validation(Username, PasswordHash, HashingAlgorithm, Tags, Limits, ActingUser) -> | ||||
| add_user_sans_validation(Username, PasswordHash, HashingMod, Tags, Limits, ActingUser) -> | ||||
|     rabbit_log:debug("Asked to create a new user '~ts' with password hash", [Username]), | ||||
|     ConvertedTags = [rabbit_data_coercion:to_atom(I) || I <- Tags], | ||||
|     HashingMod = rabbit_password:hashing_mod(), | ||||
|     User0 = internal_user:create_user(Username, PasswordHash, HashingMod), | ||||
|     User1 = internal_user:set_tags( | ||||
|               internal_user:set_password_hash(User0, | ||||
|                                               PasswordHash, HashingAlgorithm), | ||||
|               internal_user:set_password_hash(User0, PasswordHash, HashingMod), | ||||
|               ConvertedTags), | ||||
|     User = case Limits of | ||||
|                undefined -> User1; | ||||
|  |  | |||
|  | @ -227,7 +227,7 @@ | |||
| %% It is called when a feature flag is being enabled. The function is | ||||
| %% responsible for this feature-flag-specific verification and data | ||||
| %% conversion. It returns `ok' if RabbitMQ can mark the feature flag as | ||||
| %% enabled an continue with the next one, if any. `{error, Reason}' and | ||||
| %% enabled and continue with the next one, if any. `{error, Reason}' and | ||||
| %% exceptions are an error and the feature flag will remain disabled. | ||||
| %% | ||||
| %% The migration function is called on all nodes which fulfill the following | ||||
|  |  | |||
|  | @ -34,6 +34,7 @@ | |||
| -define(NODE_REPLY_TIMEOUT, 5000). | ||||
| -define(RABBIT_UP_RPC_TIMEOUT, 2000). | ||||
| -define(RABBIT_DOWN_PING_INTERVAL, 1000). | ||||
| -define(NODE_DISCONNECTION_TIMEOUT, 1000). | ||||
| 
 | ||||
| -record(state, {monitors, partitions, subscribers, down_ping_timer, | ||||
|                 keepalive_timer, autoheal, guid, node_guids}). | ||||
|  | @ -921,13 +922,23 @@ upgrade_to_full_partition(Proxy) -> | |||
| %% detect a very short partition. So we want to force a slightly | ||||
| %% longer disconnect. Unfortunately we don't have a way to blacklist | ||||
| %% individual nodes; the best we can do is turn off auto-connect | ||||
| %% altogether. | ||||
| %% altogether. If Node is not already part of the connected nodes, then | ||||
| %% there's no need to repeat disabling dist_auto_connect and executing | ||||
| %% disconnect_node/1, which can result in application_controller | ||||
| %% timeouts and crash node monitor process. This also implies that | ||||
| %% the already disconnected node was already processed. In an | ||||
| %% unstable network, if we get consecutive 'up' and 'down' messages, | ||||
| %% then we expect disconnect_node/1 to be executed. | ||||
| disconnect(Node) -> | ||||
|     case lists:member(Node, nodes()) of | ||||
|         true -> | ||||
|             application:set_env(kernel, dist_auto_connect, never), | ||||
|             erlang:disconnect_node(Node), | ||||
|     timer:sleep(1000), | ||||
|     application:unset_env(kernel, dist_auto_connect), | ||||
|     ok. | ||||
|             timer:sleep(?NODE_DISCONNECTION_TIMEOUT), | ||||
|             application:unset_env(kernel, dist_auto_connect); | ||||
|         false -> | ||||
|             ok | ||||
|     end. | ||||
| 
 | ||||
| %%-------------------------------------------------------------------- | ||||
| 
 | ||||
|  | @ -998,8 +1009,12 @@ ping_all() -> | |||
| possibly_partitioned_nodes() -> | ||||
|     alive_rabbit_nodes() -- rabbit_mnesia:cluster_nodes(running). | ||||
| 
 | ||||
| startup_log([]) -> | ||||
|     rabbit_log:info("Starting rabbit_node_monitor", []); | ||||
| startup_log(Nodes) -> | ||||
|     rabbit_log:info("Starting rabbit_node_monitor, might be partitioned from ~tp", | ||||
|                     [Nodes]). | ||||
|     {ok, M} = application:get_env(rabbit, cluster_partition_handling), | ||||
|     startup_log(Nodes, M). | ||||
| 
 | ||||
| startup_log([], PartitionHandling) -> | ||||
|     rabbit_log:info("Starting rabbit_node_monitor (in ~tp mode)", [PartitionHandling]); | ||||
| startup_log(Nodes, PartitionHandling) -> | ||||
|     rabbit_log:info("Starting rabbit_node_monitor (in ~tp mode), might be partitioned from ~tp", | ||||
|                     [PartitionHandling, Nodes]). | ||||
|  |  | |||
|  | @ -137,6 +137,8 @@ sheet_body(PrevState) -> | |||
|                                          case maps:get(InternalName, RaStates, undefined) of | ||||
|                                              leader -> "L"; | ||||
|                                              follower -> "F"; | ||||
|                                              promotable -> "f";  %% temporary non-voter | ||||
|                                              non_voter -> "-";  %% permanent non-voter | ||||
|                                              _ -> "?" | ||||
|                                          end, | ||||
|                                          format_int(proplists:get_value(memory, ProcInfo)), | ||||
|  |  | |||
|  | @ -55,6 +55,7 @@ register() -> | |||
|                           {operator_policy_validator, <<"max-in-memory-bytes">>}, | ||||
|                           {operator_policy_validator, <<"delivery-limit">>}, | ||||
|                           {operator_policy_validator, <<"queue-version">>}, | ||||
|                           {operator_policy_validator, <<"overflow">>}, | ||||
|                           {policy_merge_strategy, <<"expires">>}, | ||||
|                           {policy_merge_strategy, <<"message-ttl">>}, | ||||
|                           {policy_merge_strategy, <<"max-length">>}, | ||||
|  | @ -62,7 +63,8 @@ register() -> | |||
|                           {policy_merge_strategy, <<"max-in-memory-length">>}, | ||||
|                           {policy_merge_strategy, <<"max-in-memory-bytes">>}, | ||||
|                           {policy_merge_strategy, <<"delivery-limit">>}, | ||||
|                           {policy_merge_strategy, <<"queue-version">>}]], | ||||
|                           {policy_merge_strategy, <<"queue-version">>}, | ||||
|                           {policy_merge_strategy, <<"overflow">>}]], | ||||
|     ok. | ||||
| 
 | ||||
| -spec validate_policy([{binary(), term()}]) -> rabbit_policy_validator:validate_results(). | ||||
|  | @ -214,5 +216,6 @@ merge_policy_value(<<"max-in-memory-bytes">>, Val, OpVal) -> min(Val, OpVal); | |||
| merge_policy_value(<<"expires">>, Val, OpVal)          -> min(Val, OpVal); | ||||
| merge_policy_value(<<"delivery-limit">>, Val, OpVal)   -> min(Val, OpVal); | ||||
| merge_policy_value(<<"queue-version">>, _Val, OpVal)   -> OpVal; | ||||
| merge_policy_value(<<"overflow">>, _Val, OpVal)   -> OpVal; | ||||
| %% use operator policy value for booleans | ||||
| merge_policy_value(_Key, Val, OpVal) when is_boolean(Val) andalso is_boolean(OpVal) -> OpVal. | ||||
|  |  | |||
|  | @ -39,21 +39,24 @@ | |||
| -export([open_files/1]). | ||||
| -export([peek/2, peek/3]). | ||||
| -export([add_member/2, | ||||
|          add_member/4]). | ||||
|          add_member/3, | ||||
|          add_member/4, | ||||
|          add_member/5]). | ||||
| -export([delete_member/3, delete_member/2]). | ||||
| -export([requeue/3]). | ||||
| -export([policy_changed/1]). | ||||
| -export([format_ra_event/3]). | ||||
| -export([cleanup_data_dir/0]). | ||||
| -export([shrink_all/1, | ||||
|          grow/4]). | ||||
|          grow/4, | ||||
|          grow/5]). | ||||
| -export([transfer_leadership/2, get_replicas/1, queue_length/1]). | ||||
| -export([file_handle_leader_reservation/1, | ||||
|          file_handle_other_reservation/0]). | ||||
| -export([file_handle_release_reservation/0]). | ||||
| -export([list_with_minimum_quorum/0, | ||||
|          filter_quorum_critical/1, | ||||
|          filter_quorum_critical/2, | ||||
|          filter_quorum_critical/3, | ||||
|          all_replica_states/0]). | ||||
| -export([capabilities/0]). | ||||
| -export([repair_amqqueue_nodes/1, | ||||
|  | @ -84,6 +87,7 @@ | |||
| -type msg_id() :: non_neg_integer(). | ||||
| -type qmsg() :: {rabbit_types:r('queue'), pid(), msg_id(), boolean(), | ||||
|                  mc:state()}. | ||||
| -type membership() :: voter | non_voter | promotable.  %% see ra_membership() in Ra. | ||||
| 
 | ||||
| -define(RA_SYSTEM, quorum_queues). | ||||
| -define(RA_WAL_NAME, ra_log_wal). | ||||
|  | @ -384,13 +388,15 @@ become_leader(QName, Name) -> | |||
| all_replica_states() -> | ||||
|     Rows0 = ets:tab2list(ra_state), | ||||
|     Rows = lists:map(fun | ||||
|                          %% TODO: support other membership types | ||||
|                          ({K, follower, promotable}) -> | ||||
|                              {K, promotable}; | ||||
|                          ({K, follower, non_voter}) -> | ||||
|                              {K, non_voter}; | ||||
|                          ({K, S, voter}) -> | ||||
|                              {K, S}; | ||||
|                          (T) -> | ||||
|                              T | ||||
|                      end, Rows0), | ||||
| 
 | ||||
|     {node(), maps:from_list(Rows)}. | ||||
| 
 | ||||
| -spec list_with_minimum_quorum() -> [amqqueue:amqqueue()]. | ||||
|  | @ -419,20 +425,22 @@ filter_quorum_critical(Queues) -> | |||
|     ReplicaStates = maps:from_list( | ||||
|                         rabbit_misc:append_rpc_all_nodes(rabbit_nodes:list_running(), | ||||
|                             ?MODULE, all_replica_states, [])), | ||||
|     filter_quorum_critical(Queues, ReplicaStates). | ||||
|     filter_quorum_critical(Queues, ReplicaStates, node()). | ||||
| 
 | ||||
| -spec filter_quorum_critical([amqqueue:amqqueue()], #{node() => #{atom() => atom()}}) -> [amqqueue:amqqueue()]. | ||||
| -spec filter_quorum_critical([amqqueue:amqqueue()], #{node() => #{atom() => atom()}}, node()) -> | ||||
|     [amqqueue:amqqueue()]. | ||||
| 
 | ||||
| filter_quorum_critical(Queues, ReplicaStates) -> | ||||
| filter_quorum_critical(Queues, ReplicaStates, Self) -> | ||||
|     lists:filter(fun (Q) -> | ||||
|                     MemberNodes = rabbit_amqqueue:get_quorum_nodes(Q), | ||||
|                     {Name, _Node} = amqqueue:get_pid(Q), | ||||
|                     AllUp = lists:filter(fun (N) -> | ||||
|                                             {Name, _} = amqqueue:get_pid(Q), | ||||
|                                             case maps:get(N, ReplicaStates, undefined) of | ||||
|                                                 #{Name := State} | ||||
|                                                   when State =:= follower orelse | ||||
|                                                        State =:= leader -> | ||||
|                                                        State =:= leader orelse | ||||
|                                                        (State =:= promotable andalso N =:= Self) orelse | ||||
|                                                        (State =:= non_voter andalso N =:= Self) -> | ||||
|                                                     true; | ||||
|                                                 _ -> false | ||||
|                                             end | ||||
|  | @ -1143,7 +1151,7 @@ get_sys_status(Proc) -> | |||
| 
 | ||||
|     end. | ||||
| 
 | ||||
| add_member(VHost, Name, Node, Timeout) -> | ||||
| add_member(VHost, Name, Node, Membership, Timeout) when is_binary(VHost) -> | ||||
|     QName = #resource{virtual_host = VHost, name = Name, kind = queue}, | ||||
|     rabbit_log:debug("Asked to add a replica for queue ~ts on node ~ts", [rabbit_misc:rs(QName), Node]), | ||||
|     case rabbit_amqqueue:lookup(QName) of | ||||
|  | @ -1161,7 +1169,7 @@ add_member(VHost, Name, Node, Timeout) -> | |||
|                           rabbit_log:debug("Quorum ~ts already has a replica on node ~ts", [rabbit_misc:rs(QName), Node]), | ||||
|                           ok; | ||||
|                         false -> | ||||
|                             add_member(Q, Node, Timeout) | ||||
|                             add_member(Q, Node, Membership, Timeout) | ||||
|                     end | ||||
|             end; | ||||
|         {ok, _Q} -> | ||||
|  | @ -1171,9 +1179,15 @@ add_member(VHost, Name, Node, Timeout) -> | |||
|     end. | ||||
| 
 | ||||
| add_member(Q, Node) -> | ||||
|     add_member(Q, Node, ?ADD_MEMBER_TIMEOUT). | ||||
|     add_member(Q, Node, promotable). | ||||
| 
 | ||||
| add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> | ||||
| add_member(Q, Node, Membership) -> | ||||
|     add_member(Q, Node, Membership, ?ADD_MEMBER_TIMEOUT). | ||||
| 
 | ||||
| add_member(VHost, Name, Node, Timeout) when is_binary(VHost) -> | ||||
|     %% NOTE needed to pass mixed cluster tests. | ||||
|     add_member(VHost, Name, Node, promotable, Timeout); | ||||
| add_member(Q, Node, Membership, Timeout) when ?amqqueue_is_quorum(Q) -> | ||||
|     {RaName, _} = amqqueue:get_pid(Q), | ||||
|     QName = amqqueue:get_name(Q), | ||||
|     %% TODO parallel calls might crash this, or add a duplicate in quorum_nodes | ||||
|  | @ -1183,7 +1197,7 @@ add_member(Q, Node, Timeout) when ?amqqueue_is_quorum(Q) -> | |||
|                                       ?TICK_TIMEOUT), | ||||
|     SnapshotInterval = application:get_env(rabbit, quorum_snapshot_interval, | ||||
|                                            ?SNAPSHOT_INTERVAL), | ||||
|     Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, voter), | ||||
|     Conf = make_ra_conf(Q, ServerId, TickTimeout, SnapshotInterval, Membership), | ||||
|     case ra:start_server(?RA_SYSTEM, Conf) of | ||||
|         ok -> | ||||
|             ServerIdSpec = maps:with([id, uid, membership], Conf), | ||||
|  | @ -1295,17 +1309,21 @@ shrink_all(Node) -> | |||
|             amqqueue:get_type(Q) == ?MODULE, | ||||
|             lists:member(Node, get_nodes(Q))]. | ||||
| 
 | ||||
| -spec grow(node(), binary(), binary(), all | even) -> | ||||
| 
 | ||||
|  grow(Node, VhostSpec, QueueSpec, Strategy) -> | ||||
|     grow(Node, VhostSpec, QueueSpec, Strategy, promotable). | ||||
| 
 | ||||
| -spec grow(node(), binary(), binary(), all | even, membership()) -> | ||||
|     [{rabbit_amqqueue:name(), | ||||
|       {ok, pos_integer()} | {error, pos_integer(), term()}}]. | ||||
|  grow(Node, VhostSpec, QueueSpec, Strategy) -> | ||||
|  grow(Node, VhostSpec, QueueSpec, Strategy, Membership) -> | ||||
|     Running = rabbit_nodes:list_running(), | ||||
|     [begin | ||||
|          Size = length(get_nodes(Q)), | ||||
|          QName = amqqueue:get_name(Q), | ||||
|          rabbit_log:info("~ts: adding a new member (replica) on node ~w", | ||||
|                          [rabbit_misc:rs(QName), Node]), | ||||
|          case add_member(Q, Node, ?ADD_MEMBER_TIMEOUT) of | ||||
|          case add_member(Q, Node, Membership) of | ||||
|              ok -> | ||||
|                  {QName, {ok, Size + 1}}; | ||||
|              {error, Err} -> | ||||
|  |  | |||
|  | @ -340,12 +340,20 @@ filter_spec(Args) -> | |||
| 
 | ||||
| get_local_pid(#stream_client{local_pid = Pid} = State) | ||||
|   when is_pid(Pid) -> | ||||
|     case erlang:is_process_alive(Pid) of | ||||
|         true -> | ||||
|             {Pid, State}; | ||||
|         false -> | ||||
|             query_local_pid(State) | ||||
|     end; | ||||
| get_local_pid(#stream_client{leader = Pid} = State) | ||||
|   when is_pid(Pid) andalso node(Pid) == node() -> | ||||
|     {Pid, State#stream_client{local_pid = Pid}}; | ||||
| get_local_pid(#stream_client{stream_id = StreamId} = State) -> | ||||
|     get_local_pid(State#stream_client{local_pid = Pid}); | ||||
| get_local_pid(#stream_client{} = State) -> | ||||
|     %% query local coordinator to get pid | ||||
|     query_local_pid(State). | ||||
| 
 | ||||
| query_local_pid(#stream_client{stream_id = StreamId} = State) -> | ||||
|     case rabbit_stream_coordinator:local_pid(StreamId) of | ||||
|         {ok, Pid} -> | ||||
|             {Pid, State#stream_client{local_pid = Pid}}; | ||||
|  | @ -555,9 +563,10 @@ recover(_VHost, Queues) -> | |||
|               {[Q | R0], F0} | ||||
|       end, {[], []}, Queues). | ||||
| 
 | ||||
| settle(QName, complete, CTag, MsgIds, #stream_client{readers = Readers0, | ||||
| settle(QName, _, CTag, MsgIds, #stream_client{readers = Readers0, | ||||
|                                                      local_pid = LocalPid, | ||||
|                                                      name = Name} = State) -> | ||||
|     %% all settle reasons will "give credit" to the stream queue | ||||
|     Credit = length(MsgIds), | ||||
|     {Readers, Msgs} = case Readers0 of | ||||
|                           #{CTag := #stream{credit = Credit0} = Str0} -> | ||||
|  | @ -567,11 +576,7 @@ settle(QName, complete, CTag, MsgIds, #stream_client{readers = Readers0, | |||
|                           _ -> | ||||
|                               {Readers0, []} | ||||
|                       end, | ||||
|     {State#stream_client{readers = Readers}, [{deliver, CTag, true, Msgs}]}; | ||||
| settle(_, _, _, _, #stream_client{name = Name}) -> | ||||
|     {protocol_error, not_implemented, | ||||
|      "basic.nack and basic.reject not supported by stream queues ~ts", | ||||
|      [rabbit_misc:rs(Name)]}. | ||||
|     {State#stream_client{readers = Readers}, [{deliver, CTag, true, Msgs}]}. | ||||
| 
 | ||||
| info(Q, all_keys) -> | ||||
|     info(Q, ?INFO_KEYS); | ||||
|  |  | |||
|  | @ -39,6 +39,7 @@ all_tests() -> | |||
|      operator_retroactive_policy_publish_ttl, | ||||
|      queue_type_specific_policies, | ||||
|      classic_queue_version_policies, | ||||
|      overflow_policies, | ||||
|      is_supported_operator_policy_expires, | ||||
|      is_supported_operator_policy_message_ttl, | ||||
|      is_supported_operator_policy_max_length, | ||||
|  | @ -47,6 +48,7 @@ all_tests() -> | |||
|      is_supported_operator_policy_max_in_memory_bytes, | ||||
|      is_supported_operator_policy_delivery_limit, | ||||
|      is_supported_operator_policy_target_group_size, | ||||
|      is_supported_operator_policy_overflow, | ||||
|      is_supported_operator_policy_ha | ||||
|     ]. | ||||
| 
 | ||||
|  | @ -306,6 +308,29 @@ classic_queue_version_policies(Config) -> | |||
|     rabbit_ct_client_helpers:close_connection(Conn), | ||||
|     passed. | ||||
| 
 | ||||
| overflow_policies(Config) -> | ||||
|     [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), | ||||
|     {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), | ||||
|     QName = <<"policy_overflow">>, | ||||
|     declare(Ch, QName), | ||||
|     DropHead = [{<<"overflow">>, <<"drop-head">>}], | ||||
|     RejectPub = [{<<"overflow">>, <<"reject-publish">>}], | ||||
| 
 | ||||
|     Opts = #{config => Config, | ||||
|              server => Server, | ||||
|              qname  => QName}, | ||||
| 
 | ||||
|     %% OperPolicy has precedence always | ||||
|     verify_policies(DropHead, RejectPub, RejectPub, Opts), | ||||
| 
 | ||||
|     delete(Ch, QName), | ||||
|     rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"policy">>), | ||||
|     rabbit_ct_broker_helpers:clear_operator_policy(Config, 0, <<"op_policy">>), | ||||
|     rabbit_ct_client_helpers:close_channel(Ch), | ||||
|     rabbit_ct_client_helpers:close_connection(Conn), | ||||
|     passed. | ||||
| 
 | ||||
| 
 | ||||
| %% See supported policies in https://www.rabbitmq.com/parameters.html#operator-policies | ||||
| %% This test applies all supported operator policies to all queue types, | ||||
| %% and later verifies the effective policy definitions. | ||||
|  | @ -351,6 +376,12 @@ is_supported_operator_policy_target_group_size(Config) -> | |||
|     effective_operator_policy_per_queue_type( | ||||
|       Config, <<"target-group-size">>, Value, undefined, Value, undefined). | ||||
| 
 | ||||
| is_supported_operator_policy_overflow(Config) -> | ||||
|     Value = <<"drop-head">>, | ||||
|     effective_operator_policy_per_queue_type( | ||||
|       Config, <<"overflow">>, Value, Value, Value, undefined). | ||||
| 
 | ||||
| 
 | ||||
| is_supported_operator_policy_ha(Config) -> | ||||
|     [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), | ||||
|     {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), | ||||
|  |  | |||
|  | @ -1792,7 +1792,7 @@ add_member_not_running(Config) -> | |||
|                  declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), | ||||
|     ?assertEqual({error, node_not_running}, | ||||
|                  rpc:call(Server, rabbit_quorum_queue, add_member, | ||||
|                           [<<"/">>, QQ, 'rabbit@burrow', 5000])). | ||||
|                           [<<"/">>, QQ, 'rabbit@burrow', voter, 5000])). | ||||
| 
 | ||||
| add_member_classic(Config) -> | ||||
|     [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), | ||||
|  | @ -1801,7 +1801,7 @@ add_member_classic(Config) -> | |||
|     ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), | ||||
|     ?assertEqual({error, classic_queue_not_supported}, | ||||
|                  rpc:call(Server, rabbit_quorum_queue, add_member, | ||||
|                           [<<"/">>, CQ, Server, 5000])). | ||||
|                           [<<"/">>, CQ, Server, voter, 5000])). | ||||
| 
 | ||||
| add_member_wrong_type(Config) -> | ||||
|     [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), | ||||
|  | @ -1811,7 +1811,7 @@ add_member_wrong_type(Config) -> | |||
|                  declare(Ch, SQ, [{<<"x-queue-type">>, longstr, <<"stream">>}])), | ||||
|     ?assertEqual({error, not_quorum_queue}, | ||||
|                  rpc:call(Server, rabbit_quorum_queue, add_member, | ||||
|                           [<<"/">>, SQ, Server, 5000])). | ||||
|                           [<<"/">>, SQ, Server, voter, 5000])). | ||||
| 
 | ||||
| add_member_already_a_member(Config) -> | ||||
|     [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), | ||||
|  | @ -1822,14 +1822,14 @@ add_member_already_a_member(Config) -> | |||
|     %% idempotent by design | ||||
|     ?assertEqual(ok, | ||||
|                  rpc:call(Server, rabbit_quorum_queue, add_member, | ||||
|                           [<<"/">>, QQ, Server, 5000])). | ||||
|                           [<<"/">>, QQ, Server, voter, 5000])). | ||||
| 
 | ||||
| add_member_not_found(Config) -> | ||||
|     [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), | ||||
|     QQ = ?config(queue_name, Config), | ||||
|     ?assertEqual({error, not_found}, | ||||
|                  rpc:call(Server, rabbit_quorum_queue, add_member, | ||||
|                           [<<"/">>, QQ, Server, 5000])). | ||||
|                           [<<"/">>, QQ, Server, voter, 5000])). | ||||
| 
 | ||||
| add_member(Config) -> | ||||
|     [Server0, Server1] = Servers0 = | ||||
|  | @ -1840,12 +1840,12 @@ add_member(Config) -> | |||
|                  declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), | ||||
|     ?assertEqual({error, node_not_running}, | ||||
|                  rpc:call(Server0, rabbit_quorum_queue, add_member, | ||||
|                           [<<"/">>, QQ, Server1, 5000])), | ||||
|                           [<<"/">>, QQ, Server1, voter, 5000])), | ||||
|     ok = rabbit_control_helper:command(stop_app, Server1), | ||||
|     ok = rabbit_control_helper:command(join_cluster, Server1, [atom_to_list(Server0)], []), | ||||
|     rabbit_control_helper:command(start_app, Server1), | ||||
|     ?assertEqual(ok, rpc:call(Server1, rabbit_quorum_queue, add_member, | ||||
|                               [<<"/">>, QQ, Server1, 5000])), | ||||
|                               [<<"/">>, QQ, Server1, voter, 5000])), | ||||
|     Info = rpc:call(Server0, rabbit_quorum_queue, infos, | ||||
|                     [rabbit_misc:r(<<"/">>, queue, QQ)]), | ||||
|     Servers = lists:sort(Servers0), | ||||
|  |  | |||
|  | @ -1135,37 +1135,6 @@ consume_with_autoack(Config) -> | |||
|        subscribe(Ch1, Q, true, 0)), | ||||
|     rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). | ||||
| 
 | ||||
| consume_and_nack(Config) -> | ||||
|     [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), | ||||
| 
 | ||||
|     Ch = rabbit_ct_client_helpers:open_channel(Config, Server), | ||||
|     Q = ?config(queue_name, Config), | ||||
|     ?assertEqual({'queue.declare_ok', Q, 0, 0}, | ||||
|                  declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), | ||||
| 
 | ||||
|     publish_confirm(Ch, Q, [<<"msg">>]), | ||||
| 
 | ||||
|     Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), | ||||
|     qos(Ch1, 10, false), | ||||
|     subscribe(Ch1, Q, false, 0), | ||||
|     receive | ||||
|         {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> | ||||
|             ok = amqp_channel:cast(Ch1, #'basic.nack'{delivery_tag = DeliveryTag, | ||||
|                                                       multiple     = false, | ||||
|                                                       requeue      = true}), | ||||
|             %% Nack will throw a not implemented exception. As it is a cast operation, | ||||
|             %% we'll detect the conneciton/channel closure on the next call. | ||||
|             %% Let's try to redeclare and see what happens | ||||
|             ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 540, _}}}, _}, | ||||
|                         amqp_channel:call(Ch1, #'queue.declare'{queue = Q, | ||||
|                                                                durable = true, | ||||
|                                                                auto_delete = false, | ||||
|                                                                arguments = [{<<"x-queue-type">>, longstr, <<"stream">>}]})) | ||||
|     after 10000 -> | ||||
|             exit(timeout) | ||||
|     end, | ||||
|     rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). | ||||
| 
 | ||||
| basic_cancel(Config) -> | ||||
|     [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), | ||||
| 
 | ||||
|  | @ -1370,42 +1339,13 @@ filter_consumers(Config, Server, CTag) -> | |||
|                 end, [], CInfo). | ||||
| 
 | ||||
| consume_and_reject(Config) -> | ||||
|     [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), | ||||
| 
 | ||||
|     Ch = rabbit_ct_client_helpers:open_channel(Config, Server), | ||||
|     Q = ?config(queue_name, Config), | ||||
|     ?assertEqual({'queue.declare_ok', Q, 0, 0}, | ||||
|                  declare(Config, Server, Q, [{<<"x-queue-type">>, longstr, <<"stream">>}])), | ||||
| 
 | ||||
|     publish_confirm(Ch, Q, [<<"msg">>]), | ||||
| 
 | ||||
|     Ch1 = rabbit_ct_client_helpers:open_channel(Config, Server), | ||||
|     qos(Ch1, 10, false), | ||||
|     subscribe(Ch1, Q, false, 0), | ||||
|     receive | ||||
|         {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> | ||||
|             MRef = erlang:monitor(process, Ch1), | ||||
|             ok = amqp_channel:cast(Ch1, #'basic.reject'{delivery_tag = DeliveryTag, | ||||
|                                                       requeue      = true}), | ||||
|             %% Reject will throw a not implemented exception. As it is a cast | ||||
|             %% operation, we detect the connection error from the channel | ||||
|             %% process exit reason. | ||||
|             receive | ||||
|                 {'DOWN', MRef, _, _, Reason} -> | ||||
|                     ?assertMatch( | ||||
|                        {shutdown, | ||||
|                         {connection_closing, | ||||
|                          {server_initiated_close, 540, _}}}, | ||||
|                        Reason) | ||||
|             after 10000 -> | ||||
|                       exit(timeout) | ||||
|             end | ||||
|     after 10000 -> | ||||
|             exit(timeout) | ||||
|     end, | ||||
|     rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_testcase_queue, [Q]). | ||||
| 
 | ||||
|     consume_and_(Config, fun (DT) -> #'basic.reject'{delivery_tag = DT} end). | ||||
| consume_and_nack(Config) -> | ||||
|     consume_and_(Config, fun (DT) -> #'basic.nack'{delivery_tag = DT} end). | ||||
| consume_and_ack(Config) -> | ||||
|     consume_and_(Config, fun (DT) -> #'basic.ack'{delivery_tag = DT} end). | ||||
| 
 | ||||
| consume_and_(Config, AckFun) -> | ||||
|     [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), | ||||
| 
 | ||||
|     Ch = rabbit_ct_client_helpers:open_channel(Config, Server), | ||||
|  | @ -1420,8 +1360,7 @@ consume_and_ack(Config) -> | |||
|     subscribe(Ch1, Q, false, 0), | ||||
|     receive | ||||
|         {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> | ||||
|             ok = amqp_channel:cast(Ch1, #'basic.ack'{delivery_tag = DeliveryTag, | ||||
|                                                      multiple     = false}), | ||||
|             ok = amqp_channel:cast(Ch1, AckFun(DeliveryTag)), | ||||
|             %% It will succeed as ack is now a credit operation. We should be | ||||
|             %% able to redeclare a queue (gen_server call op) as the channel | ||||
|             %% should still be open and declare is an idempotent operation | ||||
|  |  | |||
|  | @ -0,0 +1,49 @@ | |||
| -module(unit_quorum_queue_SUITE). | ||||
| 
 | ||||
| -compile(export_all). | ||||
| 
 | ||||
| all() -> | ||||
|     [ | ||||
|      all_replica_states_includes_nonvoters, | ||||
|      filter_quorum_critical_accounts_nonvoters | ||||
|     ]. | ||||
| 
 | ||||
| filter_quorum_critical_accounts_nonvoters(_Config) -> | ||||
|     Nodes = [test@leader, test@follower1, test@follower2], | ||||
|     Qs0 = [amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q1">>), | ||||
|                         {q1, test@leader}, | ||||
|                         false, false, none, [], undefined, #{}), | ||||
|            amqqueue:new(rabbit_misc:r(<<"/">>, queue, <<"q2">>), | ||||
|                         {q2, test@leader}, | ||||
|                         false, false, none, [], undefined, #{})], | ||||
|     Qs = [Q1, Q2] = lists:map(fun (Q) -> | ||||
|                                       amqqueue:set_type_state(Q, #{nodes => Nodes}) | ||||
|                               end, Qs0), | ||||
|     Ss = #{test@leader    => #{q1 => leader,   q2 => leader}, | ||||
|            test@follower1 => #{q1 => promotable, q2 => follower}, | ||||
|            test@follower2 => #{q1 => follower, q2 => promotable}}, | ||||
|     Qs = rabbit_quorum_queue:filter_quorum_critical(Qs, Ss, test@leader), | ||||
|     [Q2] = rabbit_quorum_queue:filter_quorum_critical(Qs, Ss, test@follower1), | ||||
|     [Q1] = rabbit_quorum_queue:filter_quorum_critical(Qs, Ss, test@follower2), | ||||
|     ok. | ||||
| 
 | ||||
| all_replica_states_includes_nonvoters(_Config) -> | ||||
|     ets:new(ra_state, [named_table, public, {write_concurrency, true}]), | ||||
|     ets:insert(ra_state, [ | ||||
|                           {q1, leader, voter}, | ||||
|                           {q2, follower, voter}, | ||||
|                           {q3, follower, promotable}, | ||||
|                           %% pre ra-2.7.0 | ||||
|                           {q4, leader}, | ||||
|                           {q5, follower} | ||||
|                          ]), | ||||
|     {_, #{ | ||||
|           q1 := leader, | ||||
|           q2 := follower, | ||||
|           q3 := promotable, | ||||
|           q4 := leader, | ||||
|           q5 := follower | ||||
|          }} = rabbit_quorum_queue:all_replica_states(), | ||||
| 
 | ||||
|     true = ets:delete(ra_state), | ||||
|     ok. | ||||
|  | @ -54,6 +54,7 @@ defmodule RabbitMQ.CLI.Core.DocGuide do | |||
|   Macros.defguide("monitoring") | ||||
|   Macros.defguide("networking") | ||||
|   Macros.defguide("parameters") | ||||
|   Macros.defguide("passwords") | ||||
|   Macros.defguide("plugins") | ||||
|   Macros.defguide("prometheus") | ||||
|   Macros.defguide("publishers") | ||||
|  |  | |||
|  | @ -10,21 +10,37 @@ defmodule RabbitMQ.CLI.Ctl.Commands.AddUserCommand do | |||
| 
 | ||||
|   @behaviour RabbitMQ.CLI.CommandBehaviour | ||||
| 
 | ||||
|   use RabbitMQ.CLI.Core.MergesNoDefaults | ||||
|   def switches(), do: [pre_hashed_password: :boolean] | ||||
| 
 | ||||
|   def merge_defaults(args, opts) do | ||||
|     {args, Map.merge(%{pre_hashed_password: false}, opts)} | ||||
|   end | ||||
| 
 | ||||
|   def validate(args, _) when length(args) < 1, do: {:validation_failure, :not_enough_args} | ||||
|   def validate(args, _) when length(args) > 2, do: {:validation_failure, :too_many_args} | ||||
|   def validate([_], _), do: :ok | ||||
|   # Password will be provided via standard input | ||||
|   def validate([_username], _), do: :ok | ||||
| 
 | ||||
|   def validate(["", _], _) do | ||||
|     {:validation_failure, {:bad_argument, "user cannot be an empty string"}} | ||||
|   end | ||||
| 
 | ||||
|   def validate([_, base64_encoded_password_hash], %{pre_hashed_password: true}) do | ||||
|     case Base.decode64(base64_encoded_password_hash) do | ||||
|       {:ok, _password_hash} -> | ||||
|         :ok | ||||
| 
 | ||||
|       _ -> | ||||
|         {:validation_failure, | ||||
|          {:bad_argument, "Could not Base64 decode provided password hash value"}} | ||||
|     end | ||||
|   end | ||||
| 
 | ||||
|   def validate([_, _], _), do: :ok | ||||
| 
 | ||||
|   use RabbitMQ.CLI.Core.RequiresRabbitAppRunning | ||||
| 
 | ||||
|   def run([username], %{node: node_name} = opts) do | ||||
|   def run([username], %{node: node_name, pre_hashed_password: false} = opts) do | ||||
|     # note: blank passwords are currently allowed, they make sense | ||||
|     # e.g. when a user only authenticates using X.509 certificates. | ||||
|     # Credential validators can be used to require passwords of a certain length | ||||
|  | @ -43,6 +59,46 @@ defmodule RabbitMQ.CLI.Ctl.Commands.AddUserCommand do | |||
|     end | ||||
|   end | ||||
| 
 | ||||
|   def run([username], %{node: node_name, pre_hashed_password: true} = opts) do | ||||
|     case Input.infer_password("Hashed and salted password: ", opts) do | ||||
|       :eof -> | ||||
|         {:error, :not_enough_args} | ||||
| 
 | ||||
|       base64_encoded_password_hash -> | ||||
|         case Base.decode64(base64_encoded_password_hash) do | ||||
|           {:ok, password_hash} -> | ||||
|             :rabbit_misc.rpc_call( | ||||
|               node_name, | ||||
|               :rabbit_auth_backend_internal, | ||||
|               :add_user_with_pre_hashed_password_sans_validation, | ||||
|               [username, password_hash, Helpers.cli_acting_user()] | ||||
|             ) | ||||
| 
 | ||||
|           _ -> | ||||
|             {:error, ExitCodes.exit_dataerr(), | ||||
|              "Could not Base64 decode provided password hash value"} | ||||
|         end | ||||
|     end | ||||
|   end | ||||
| 
 | ||||
|   def run( | ||||
|         [username, base64_encoded_password_hash], | ||||
|         %{node: node_name, pre_hashed_password: true} = opts | ||||
|       ) do | ||||
|     case Base.decode64(base64_encoded_password_hash) do | ||||
|       {:ok, password_hash} -> | ||||
|         :rabbit_misc.rpc_call( | ||||
|           node_name, | ||||
|           :rabbit_auth_backend_internal, | ||||
|           :add_user_with_pre_hashed_password_sans_validation, | ||||
|           [username, password_hash, Helpers.cli_acting_user()] | ||||
|         ) | ||||
| 
 | ||||
|       _ -> | ||||
|         {:error, ExitCodes.exit_dataerr(), "Could not Base64 decode provided password hash value"} | ||||
|     end | ||||
|   end | ||||
| 
 | ||||
|   def run([username, password], %{node: node_name}) do | ||||
|     :rabbit_misc.rpc_call( | ||||
|       node_name, | ||||
|  | @ -89,21 +145,30 @@ defmodule RabbitMQ.CLI.Ctl.Commands.AddUserCommand do | |||
| 
 | ||||
|   use RabbitMQ.CLI.DefaultOutput | ||||
| 
 | ||||
|   def usage, do: "add_user <username> <password>" | ||||
|   def usage, do: "add_user <username> [<password>] [<password_hash> --pre-hashed-password]" | ||||
| 
 | ||||
|   def usage_additional() do | ||||
|     [ | ||||
|       ["<username>", "Self-explanatory"], | ||||
|       [ | ||||
|         "<password>", | ||||
|         "Password this user will authenticate with. Use a blank string to disable password-based authentication." | ||||
|         "Password this user will authenticate with. Use a blank string to disable password-based authentication. Mutually exclusive with <password_hash>" | ||||
|       ], | ||||
|       [ | ||||
|         "<password_hash>", | ||||
|         "A Base64-encoded password hash produced by the 'hash_password' command or a different method as described in the Passwords guide. Must be used in combination with --pre-hashed-password. Mutually exclusive with <password>" | ||||
|       ], | ||||
|       [ | ||||
|         "--pre-hashed-password", | ||||
|         "Use to pass in a password hash instead of a clear text password. Disabled by default" | ||||
|       ] | ||||
|     ] | ||||
|   end | ||||
| 
 | ||||
|   def usage_doc_guides() do | ||||
|     [ | ||||
|       DocGuide.access_control() | ||||
|       DocGuide.access_control(), | ||||
|       DocGuide.passwords() | ||||
|     ] | ||||
|   end | ||||
| 
 | ||||
|  |  | |||
|  | @ -10,21 +10,51 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do | |||
| 
 | ||||
|   @behaviour RabbitMQ.CLI.CommandBehaviour | ||||
| 
 | ||||
|   @default_timeout 5_000 | ||||
|   defp default_opts, do: %{vhost: "/", membership: "promotable", timeout: 5_000} | ||||
| 
 | ||||
|   def merge_defaults(args, opts) do | ||||
|     timeout = | ||||
|       case opts[:timeout] do | ||||
|         nil -> @default_timeout | ||||
|         :infinity -> @default_timeout | ||||
|     default = default_opts() | ||||
| 
 | ||||
|     opts = | ||||
|       Map.update( | ||||
|         opts, | ||||
|         :timeout, | ||||
|         :infinity, | ||||
|         &case &1 do | ||||
|           :infinity -> default.timeout | ||||
|           other -> other | ||||
|         end | ||||
|       ) | ||||
| 
 | ||||
|     {args, Map.merge(%{vhost: "/", timeout: timeout}, opts)} | ||||
|     {args, Map.merge(default, opts)} | ||||
|   end | ||||
| 
 | ||||
|   use RabbitMQ.CLI.Core.AcceptsDefaultSwitchesAndTimeout | ||||
|   use RabbitMQ.CLI.Core.AcceptsTwoPositionalArguments | ||||
|   def switches(), | ||||
|     do: [ | ||||
|       timeout: :integer, | ||||
|       membership: :string | ||||
|     ] | ||||
| 
 | ||||
|   def aliases(), do: [t: :timeout] | ||||
| 
 | ||||
|   def validate(args, _) when length(args) < 2 do | ||||
|     {:validation_failure, :not_enough_args} | ||||
|   end | ||||
| 
 | ||||
|   def validate(args, _) when length(args) > 2 do | ||||
|     {:validation_failure, :too_many_args} | ||||
|   end | ||||
| 
 | ||||
|   def validate(_, %{membership: m}) | ||||
|       when not (m == "promotable" or | ||||
|                   m == "non_voter" or | ||||
|                   m == "voter") do | ||||
|     {:validation_failure, "voter status '#{m}' is not recognised."} | ||||
|   end | ||||
| 
 | ||||
|   def validate(_, _) do | ||||
|     :ok | ||||
|   end | ||||
| 
 | ||||
|   def validate_execution_environment(args, opts) do | ||||
|     Validators.chain( | ||||
|  | @ -39,13 +69,19 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do | |||
|     ) | ||||
|   end | ||||
| 
 | ||||
|   def run([name, node] = _args, %{vhost: vhost, node: node_name, timeout: timeout}) do | ||||
|     case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :add_member, [ | ||||
|            vhost, | ||||
|            name, | ||||
|            to_atom(node), | ||||
|            timeout | ||||
|          ]) do | ||||
|   def run( | ||||
|         [name, node] = _args, | ||||
|         %{vhost: vhost, node: node_name, timeout: timeout, membership: membership} | ||||
|       ) do | ||||
|     args = [vhost, name, to_atom(node)] | ||||
| 
 | ||||
|     args = | ||||
|       case to_atom(membership) do | ||||
|         :promotable -> args ++ [timeout] | ||||
|         other -> args ++ [other, timeout] | ||||
|       end | ||||
| 
 | ||||
|     case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :add_member, args) do | ||||
|       {:error, :classic_queue_not_supported} -> | ||||
|         {:error, "Cannot add members to a classic queue"} | ||||
| 
 | ||||
|  | @ -59,12 +95,13 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommand do | |||
| 
 | ||||
|   use RabbitMQ.CLI.DefaultOutput | ||||
| 
 | ||||
|   def usage, do: "add_member [--vhost <vhost>] <queue> <node>" | ||||
|   def usage, do: "add_member [--vhost <vhost>] <queue> <node> [--membership <promotable|voter>]" | ||||
| 
 | ||||
|   def usage_additional do | ||||
|     [ | ||||
|       ["<queue>", "quorum queue name"], | ||||
|       ["<node>", "node to add a new replica on"] | ||||
|       ["<node>", "node to add a new replica on"], | ||||
|       ["--membership <promotable|voter>", "add a promotable non-voter (default) or full voter"] | ||||
|     ] | ||||
|   end | ||||
| 
 | ||||
|  |  | |||
|  | @ -10,12 +10,14 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do | |||
| 
 | ||||
|   @behaviour RabbitMQ.CLI.CommandBehaviour | ||||
| 
 | ||||
|   defp default_opts, do: %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false} | ||||
|   defp default_opts, | ||||
|     do: %{vhost_pattern: ".*", queue_pattern: ".*", membership: "promotable", errors_only: false} | ||||
| 
 | ||||
|   def switches(), | ||||
|     do: [ | ||||
|       vhost_pattern: :string, | ||||
|       queue_pattern: :string, | ||||
|       membership: :string, | ||||
|       errors_only: :boolean | ||||
|     ] | ||||
| 
 | ||||
|  | @ -31,17 +33,21 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do | |||
|     {:validation_failure, :too_many_args} | ||||
|   end | ||||
| 
 | ||||
|   def validate([_, s], _) do | ||||
|     case s do | ||||
|       "all" -> | ||||
|         :ok | ||||
| 
 | ||||
|       "even" -> | ||||
|         :ok | ||||
| 
 | ||||
|       _ -> | ||||
|   def validate([_, s], _) | ||||
|       when not (s == "all" or | ||||
|                   s == "even") do | ||||
|     {:validation_failure, "strategy '#{s}' is not recognised."} | ||||
|   end | ||||
| 
 | ||||
|   def validate(_, %{membership: m}) | ||||
|       when not (m == "promotable" or | ||||
|                   m == "non_voter" or | ||||
|                   m == "voter") do | ||||
|     {:validation_failure, "voter status '#{m}' is not recognised."} | ||||
|   end | ||||
| 
 | ||||
|   def validate(_, _) do | ||||
|     :ok | ||||
|   end | ||||
| 
 | ||||
|   def validate_execution_environment(args, opts) do | ||||
|  | @ -58,14 +64,18 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do | |||
|         node: node_name, | ||||
|         vhost_pattern: vhost_pat, | ||||
|         queue_pattern: queue_pat, | ||||
|         membership: membership, | ||||
|         errors_only: errors_only | ||||
|       }) do | ||||
|     case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :grow, [ | ||||
|            to_atom(node), | ||||
|            vhost_pat, | ||||
|            queue_pat, | ||||
|            to_atom(strategy) | ||||
|          ]) do | ||||
|     args = [to_atom(node), vhost_pat, queue_pat, to_atom(strategy)] | ||||
| 
 | ||||
|     args = | ||||
|       case to_atom(membership) do | ||||
|         :promotable -> args | ||||
|         other -> args ++ [other] | ||||
|       end | ||||
| 
 | ||||
|     case :rabbit_misc.rpc_call(node_name, :rabbit_quorum_queue, :grow, args) do | ||||
|       {:error, _} = error -> | ||||
|         error | ||||
| 
 | ||||
|  | @ -97,7 +107,8 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do | |||
|   def formatter(), do: RabbitMQ.CLI.Formatters.Table | ||||
| 
 | ||||
|   def usage, | ||||
|     do: "grow <node> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>]" | ||||
|     do: | ||||
|       "grow <node> <all | even> [--vhost-pattern <pattern>] [--queue-pattern <pattern>] [--membership <promotable|voter>]" | ||||
| 
 | ||||
|   def usage_additional do | ||||
|     [ | ||||
|  | @ -108,6 +119,7 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommand do | |||
|       ], | ||||
|       ["--queue-pattern <pattern>", "regular expression to match queue names"], | ||||
|       ["--vhost-pattern <pattern>", "regular expression to match virtual host names"], | ||||
|       ["--membership <promotable|voter>", "add a promotable non-voter (default) or full voter"], | ||||
|       ["--errors-only", "only list queues which reported an error"] | ||||
|     ] | ||||
|   end | ||||
|  |  | |||
|  | @ -9,6 +9,8 @@ defmodule AddUserCommandTest do | |||
|   import TestHelper | ||||
| 
 | ||||
|   @command RabbitMQ.CLI.Ctl.Commands.AddUserCommand | ||||
|   @hash_password_command RabbitMQ.CLI.Ctl.Commands.HashPasswordCommand | ||||
|   @authenticate_user_command RabbitMQ.CLI.Ctl.Commands.AuthenticateUserCommand | ||||
| 
 | ||||
|   setup_all do | ||||
|     RabbitMQ.CLI.Core.Distribution.start() | ||||
|  | @ -18,7 +20,7 @@ defmodule AddUserCommandTest do | |||
| 
 | ||||
|   setup context do | ||||
|     on_exit(context, fn -> delete_user(context[:user]) end) | ||||
|     {:ok, opts: %{node: get_rabbit_hostname()}} | ||||
|     {:ok, opts: %{node: get_rabbit_hostname(), pre_hashed_password: false}} | ||||
|   end | ||||
| 
 | ||||
|   test "validate: no positional arguments fails" do | ||||
|  | @ -55,6 +57,17 @@ defmodule AddUserCommandTest do | |||
|     assert @command.validate([context[:user], context[:password]], context[:opts]) == :ok | ||||
|   end | ||||
| 
 | ||||
|   @tag user: "someone" | ||||
|   test "validate: pre-hashed with a non-Base64-encoded value returns an error", context do | ||||
|     hashed = "this is not a Base64-encoded value" | ||||
|     opts = Map.merge(context[:opts], %{pre_hashed_password: true}) | ||||
| 
 | ||||
|     assert match?( | ||||
|              {:validation_failure, {:bad_argument, _}}, | ||||
|              @command.validate([context[:user], hashed], opts) | ||||
|            ) | ||||
|   end | ||||
| 
 | ||||
|   @tag user: "someone", password: "password" | ||||
|   test "run: request to a non-existent node returns a badrpc", context do | ||||
|     opts = %{node: :jake@thedog, timeout: 200} | ||||
|  | @ -62,9 +75,30 @@ defmodule AddUserCommandTest do | |||
|   end | ||||
| 
 | ||||
|   @tag user: "someone", password: "password" | ||||
|   test "run: default case completes successfully", context do | ||||
|   test "run: happy path completes successfully", context do | ||||
|     assert @command.run([context[:user], context[:password]], context[:opts]) == :ok | ||||
|     assert list_users() |> Enum.count(fn record -> record[:user] == context[:user] end) == 1 | ||||
| 
 | ||||
|     assert @authenticate_user_command.run([context[:user], context[:password]], context[:opts]) | ||||
|   end | ||||
| 
 | ||||
|   @tag user: "someone" | ||||
|   test "run: a pre-hashed request to a non-existent node returns a badrpc", context do | ||||
|     opts = %{node: :jake@thedog, timeout: 200} | ||||
|     hashed = "BMT6cj/MsI+4UOBtsPPQWpQfk7ViRLj4VqpMTxu54FU3qa1G" | ||||
|     assert match?({:badrpc, _}, @command.run([context[:user], hashed], opts)) | ||||
|   end | ||||
| 
 | ||||
|   @tag user: "someone" | ||||
|   test "run: pre-hashed happy path completes successfully", context do | ||||
|     pwd = "guest10" | ||||
|     hashed = @hash_password_command.hash_password(pwd) | ||||
|     opts = Map.merge(%{pre_hashed_password: true}, context[:opts]) | ||||
| 
 | ||||
|     assert @command.run([context[:user], hashed], opts) == :ok | ||||
|     assert list_users() |> Enum.count(fn record -> record[:user] == context[:user] end) == 1 | ||||
| 
 | ||||
|     assert @authenticate_user_command.run([context[:user], pwd], opts) | ||||
|   end | ||||
| 
 | ||||
|   @tag user: "someone", password: "password" | ||||
|  |  | |||
|  | @ -20,6 +20,8 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommandTest do | |||
|     {:ok, | ||||
|      opts: %{ | ||||
|        node: get_rabbit_hostname(), | ||||
|        membership: "voter", | ||||
|        vhost: "/", | ||||
|        timeout: context[:test_timeout] || 30000 | ||||
|      }} | ||||
|   end | ||||
|  | @ -42,17 +44,36 @@ defmodule RabbitMQ.CLI.Queues.Commands.AddMemberCommandTest do | |||
|            ) == {:validation_failure, :too_many_args} | ||||
|   end | ||||
| 
 | ||||
|   test "validate: when membership promotable is provided, returns a success" do | ||||
|     assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "promotable"}) == | ||||
|              :ok | ||||
|   end | ||||
| 
 | ||||
|   test "validate: when membership voter is provided, returns a success" do | ||||
|     assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "voter"}) == :ok | ||||
|   end | ||||
| 
 | ||||
|   test "validate: when membership non_voter is provided, returns a success" do | ||||
|     assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "non_voter"}) == | ||||
|              :ok | ||||
|   end | ||||
| 
 | ||||
|   test "validate: when wrong membership is provided, returns failure" do | ||||
|     assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{membership: "banana"}) == | ||||
|              {:validation_failure, "voter status 'banana' is not recognised."} | ||||
|   end | ||||
| 
 | ||||
|   test "validate: treats two positional arguments and default switches as a success" do | ||||
|     assert @command.validate(["quorum-queue-a", "rabbit@new-node"], %{}) == :ok | ||||
|   end | ||||
| 
 | ||||
|   @tag test_timeout: 3000 | ||||
|   test "run: targeting an unreachable node throws a badrpc" do | ||||
|   test "run: targeting an unreachable node throws a badrpc", context do | ||||
|     assert match?( | ||||
|              {:badrpc, _}, | ||||
|              @command.run( | ||||
|                ["quorum-queue-a", "rabbit@new-node"], | ||||
|                %{node: :jake@thedog, vhost: "/", timeout: 200} | ||||
|                Map.merge(context[:opts], %{node: :jake@thedog}) | ||||
|              ) | ||||
|            ) | ||||
|   end | ||||
|  |  | |||
|  | @ -23,13 +23,20 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do | |||
|        timeout: context[:test_timeout] || 30000, | ||||
|        vhost_pattern: ".*", | ||||
|        queue_pattern: ".*", | ||||
|        membership: "promotable", | ||||
|        errors_only: false | ||||
|      }} | ||||
|   end | ||||
| 
 | ||||
|   test "merge_defaults: defaults to reporting complete results" do | ||||
|     assert @command.merge_defaults([], %{}) == | ||||
|              {[], %{vhost_pattern: ".*", queue_pattern: ".*", errors_only: false}} | ||||
|              {[], | ||||
|               %{ | ||||
|                 vhost_pattern: ".*", | ||||
|                 queue_pattern: ".*", | ||||
|                 errors_only: false, | ||||
|                 membership: "promotable" | ||||
|               }} | ||||
|   end | ||||
| 
 | ||||
|   test "validate: when no arguments are provided, returns a failure" do | ||||
|  | @ -58,13 +65,30 @@ defmodule RabbitMQ.CLI.Queues.Commands.GrowCommandTest do | |||
|              {:validation_failure, :too_many_args} | ||||
|   end | ||||
| 
 | ||||
|   test "validate: when membership promotable is provided, returns a success" do | ||||
|     assert @command.validate(["quorum-queue-a", "all"], %{membership: "promotable"}) == :ok | ||||
|   end | ||||
| 
 | ||||
|   test "validate: when membership voter is provided, returns a success" do | ||||
|     assert @command.validate(["quorum-queue-a", "all"], %{membership: "voter"}) == :ok | ||||
|   end | ||||
| 
 | ||||
|   test "validate: when membership non_voter is provided, returns a success" do | ||||
|     assert @command.validate(["quorum-queue-a", "all"], %{membership: "non_voter"}) == :ok | ||||
|   end | ||||
| 
 | ||||
|   test "validate: when wrong membership is provided, returns failure" do | ||||
|     assert @command.validate(["quorum-queue-a", "all"], %{membership: "banana"}) == | ||||
|              {:validation_failure, "voter status 'banana' is not recognised."} | ||||
|   end | ||||
| 
 | ||||
|   @tag test_timeout: 3000 | ||||
|   test "run: targeting an unreachable node throws a badrpc", context do | ||||
|     assert match?( | ||||
|              {:badrpc, _}, | ||||
|              @command.run( | ||||
|                ["quorum-queue-a", "all"], | ||||
|                Map.merge(context[:opts], %{node: :jake@thedog, timeout: 200}) | ||||
|                Map.merge(context[:opts], %{node: :jake@thedog}) | ||||
|              ) | ||||
|            ) | ||||
|   end | ||||
|  |  | |||
|  | @ -289,8 +289,9 @@ | |||
|                   <span class="argument-link" field="definitionop" key="max-length" type="number">Max length</span> | | ||||
|                   <span class="argument-link" field="definitionop" key="max-length-bytes" type="number">Max length bytes</span> | | ||||
|                   <span class="argument-link" field="definitionop" key="message-ttl" type="number">Message TTL</span> | | ||||
|                   <span class="argument-link" field="definitionop" key="queue-version" type="number">Version</span> <span class="help" id="queue-version"></span> </br> | ||||
|                   <span class="help" id="queue-message-ttl"></span> | ||||
|                   <span class="help" id="queue-message-ttl"></span> | | ||||
|                   <span class="argument-link" field="definitionop" key="queue-version" type="number">Version</span> <span class="help" id="queue-version"></span> </br> | | ||||
|                   <span class="argument-link" field="definitionop" key="overflow" type="string">Length limit overflow behavior</span> <span class="help" id="overflow"></span> </br> | ||||
|                 </td> | ||||
|               </tr> | ||||
|               <tr> | ||||
|  | @ -305,7 +306,8 @@ | |||
|                   <span class="argument-link" field="definitionop" key="max-length-bytes" type="number">Max length bytes</span> | | ||||
|                   <span class="argument-link" field="definitionop" key="message-ttl" type="number">Message TTL</span> | ||||
|                   <span class="help" id="queue-message-ttl"></span> | | ||||
|                   <span class="argument-link" field="definitionop" key="target-group-size" type="number">Target group size</span> | ||||
|                   <span class="argument-link" field="definitionop" key="target-group-size" type="number">Target group size</span> | | ||||
|                   <span class="argument-link" field="definitionop" key="overflow" type="string">Length limit overflow behavior</span> <span class="help" id="overflow"></span> </br> | ||||
|                 </td> | ||||
|               </tr> | ||||
|               <tr> | ||||
|  |  | |||
|  | @ -38,11 +38,17 @@ accept_content(ReqData, Context) -> | |||
|   QName = rabbit_mgmt_util:id(queue, ReqData), | ||||
|   Res = rabbit_mgmt_util:with_decode( | ||||
|     [node], ReqData, Context, | ||||
|     fun([NewReplicaNode], _Body, _ReqData) -> | ||||
|     fun([NewReplicaNode], Body, _ReqData) -> | ||||
|       Membership = maps:get(<<"membership">>, Body, promotable), | ||||
|       rabbit_amqqueue:with( | ||||
|         rabbit_misc:r(VHost, queue, QName), | ||||
|         fun(_Q) -> | ||||
|           rabbit_quorum_queue:add_member(VHost, QName, rabbit_data_coercion:to_atom(NewReplicaNode), ?TIMEOUT) | ||||
|                 rabbit_quorum_queue:add_member( | ||||
|                   VHost, | ||||
|                   QName, | ||||
|                   rabbit_data_coercion:to_atom(NewReplicaNode), | ||||
|                   rabbit_data_coercion:to_atom(Membership), | ||||
|                   ?TIMEOUT) | ||||
|         end) | ||||
|     end), | ||||
|   case Res of | ||||
|  |  | |||
|  | @ -39,12 +39,14 @@ accept_content(ReqData, Context) -> | |||
|   NewReplicaNode = rabbit_mgmt_util:id(node, ReqData), | ||||
|   rabbit_mgmt_util:with_decode( | ||||
|     [vhost_pattern, queue_pattern, strategy], ReqData, Context, | ||||
|     fun([VHPattern, QPattern, Strategy], _Body, _ReqData) -> | ||||
|     fun([VHPattern, QPattern, Strategy], Body, _ReqData) -> | ||||
|       Membership = maps:get(<<"membership">>, Body, promotable), | ||||
|       rabbit_quorum_queue:grow( | ||||
|         rabbit_data_coercion:to_atom(NewReplicaNode), | ||||
|         VHPattern, | ||||
|         QPattern, | ||||
|         rabbit_data_coercion:to_atom(Strategy)) | ||||
|         rabbit_data_coercion:to_atom(Strategy), | ||||
|         rabbit_data_coercion:to_atom(Membership)) | ||||
|     end), | ||||
|   {true, ReqData, Context}. | ||||
| 
 | ||||
|  |  | |||
|  | @ -44,7 +44,7 @@ | |||
| -type send_fun() :: fun((iodata()) -> ok). | ||||
| -type session_expiry_interval() :: non_neg_integer() | infinity. | ||||
| -type subscriptions() :: #{topic_filter() => #mqtt_subscription_opts{}}. | ||||
| -type topic_aliases() :: {Inbound :: #{topic() => pos_integer()}, | ||||
| -type topic_aliases() :: {Inbound :: #{pos_integer() => topic()}, | ||||
|                           Outbound :: #{topic() => pos_integer()}}. | ||||
| 
 | ||||
| -record(auth_state, | ||||
|  |  | |||
|  | @ -16,6 +16,7 @@ def all_beam_files(name = "all_beam_files"): | |||
|             "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand.erl", | ||||
|             "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand.erl", | ||||
|             "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand.erl", | ||||
|             "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand.erl", | ||||
|             "src/rabbit_stream.erl", | ||||
|             "src/rabbit_stream_connection_sup.erl", | ||||
|             "src/rabbit_stream_manager.erl", | ||||
|  | @ -55,6 +56,7 @@ def all_test_beam_files(name = "all_test_beam_files"): | |||
|             "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand.erl", | ||||
|             "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand.erl", | ||||
|             "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand.erl", | ||||
|             "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand.erl", | ||||
|             "src/rabbit_stream.erl", | ||||
|             "src/rabbit_stream_connection_sup.erl", | ||||
|             "src/rabbit_stream_manager.erl", | ||||
|  | @ -104,6 +106,7 @@ def all_srcs(name = "all_srcs"): | |||
|             "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumersCommand.erl", | ||||
|             "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand.erl", | ||||
|             "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamPublishersCommand.erl", | ||||
|             "src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand.erl", | ||||
|             "src/rabbit_stream.erl", | ||||
|             "src/rabbit_stream_connection_sup.erl", | ||||
|             "src/rabbit_stream_manager.erl", | ||||
|  |  | |||
|  | @ -54,7 +54,7 @@ help_section() -> | |||
|     {plugin, stream}. | ||||
| 
 | ||||
| validate(Args, _) -> | ||||
|     ValidKeys = lists:map(fun atom_to_list/1, ?CONSUMER_INFO_ITEMS), | ||||
|     ValidKeys = lists:map(fun atom_to_list/1, ?CONSUMER_GROUP_INFO_ITEMS), | ||||
|     case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args, | ||||
|                                                                ValidKeys) | ||||
|     of | ||||
|  |  | |||
							
								
								
									
										162
									
								
								deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand.erl
								
								
								
									vendored
								
								
									Normal file
								
							
							
						
						
									
										162
									
								
								deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand.erl
								
								
								
									vendored
								
								
									Normal file
								
							|  | @ -0,0 +1,162 @@ | |||
| %% The contents of this file are subject to the Mozilla Public License | ||||
| %% Version 2.0 (the "License"); you may not use this file except in | ||||
| %% compliance with the License. You may obtain a copy of the License | ||||
| %% at https://www.mozilla.org/MPL/ | ||||
| %% | ||||
| %% Software distributed under the License is distributed on an "AS IS" | ||||
| %% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See | ||||
| %% the License for the specific language governing rights and | ||||
| %% limitations under the License. | ||||
| %% | ||||
| %% The Original Code is RabbitMQ. | ||||
| %% | ||||
| %% The Initial Developer of the Original Code is GoPivotal, Inc. | ||||
| %% Copyright (c) 2023 VMware, Inc. or its affiliates.  All rights reserved. | ||||
| 
 | ||||
| -module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand'). | ||||
| 
 | ||||
| -include_lib("rabbitmq_stream_common/include/rabbit_stream.hrl"). | ||||
| 
 | ||||
| -behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). | ||||
| 
 | ||||
| -export([formatter/0, | ||||
|          scopes/0, | ||||
|          switches/0, | ||||
|          aliases/0, | ||||
|          usage/0, | ||||
|          usage_additional/0, | ||||
|          usage_doc_guides/0, | ||||
|          banner/2, | ||||
|          validate/2, | ||||
|          merge_defaults/2, | ||||
|          run/2, | ||||
|          output/2, | ||||
|          description/0, | ||||
|          help_section/0, | ||||
|          tracking_info/3]). | ||||
| 
 | ||||
| formatter() -> | ||||
|     'Elixir.RabbitMQ.CLI.Formatters.PrettyTable'. | ||||
| 
 | ||||
| scopes() -> | ||||
|     [streams]. | ||||
| 
 | ||||
| switches() -> | ||||
|     [{stream, string}, {all, boolean}, {offset, boolean}, {writer, boolean}]. | ||||
| 
 | ||||
| aliases() -> | ||||
|     []. | ||||
| 
 | ||||
| description() -> | ||||
|     <<"Lists tracking information for a stream">>. | ||||
| 
 | ||||
| help_section() -> | ||||
|     {plugin, stream}. | ||||
| 
 | ||||
| validate([], _Opts) -> | ||||
|     {validation_failure, not_enough_args}; | ||||
| validate([_Stream], Opts) -> | ||||
|     case maps:with([all, writer, offset], Opts) of | ||||
|         M when map_size(M) > 1 -> | ||||
|             {validation_failure, | ||||
|              "Specify only one of --all, --offset, --writer."}; | ||||
|         _ -> | ||||
|             ok | ||||
|     end; | ||||
| validate(_, _Opts) -> | ||||
|     {validation_failure, too_many_args}. | ||||
| 
 | ||||
| merge_defaults(Args, Opts) -> | ||||
|     case maps:with([all, writer, offset], Opts) of | ||||
|         M when map_size(M) =:= 0 -> | ||||
|             {Args, maps:merge(#{all => true, vhost => <<"/">>}, Opts)}; | ||||
|         _ -> | ||||
|             {Args, maps:merge(#{vhost => <<"/">>}, Opts)} | ||||
|     end. | ||||
| 
 | ||||
| usage() -> | ||||
|     <<"list_stream_tracking <stream> [--all | --offset | --writer] " | ||||
|       "[--vhost <vhost>]">>. | ||||
| 
 | ||||
| usage_additional() -> | ||||
|     [[<<"<name>">>, | ||||
|       <<"The name of the stream.">>], | ||||
|      [<<"--all">>, | ||||
|       <<"List offset and writer tracking information.">>], | ||||
|      [<<"--offset">>, | ||||
|       <<"List only offset tracking information.">>], | ||||
|      [<<"--writer">>, | ||||
|       <<"List only writer deduplication tracking information.">>], | ||||
|      [<<"--vhost <vhost>">>, | ||||
|       <<"The virtual host of the stream.">>]]. | ||||
|   | ||||
| usage_doc_guides() -> | ||||
|     [?STREAM_GUIDE_URL]. | ||||
| 
 | ||||
| run([Stream], | ||||
|     #{node := NodeName, | ||||
|       vhost := VHost, | ||||
|       timeout := Timeout} = Opts) -> | ||||
| 
 | ||||
|     TrackingType = case Opts of | ||||
|                        #{all := true} -> | ||||
|                            all; | ||||
|                        #{offset := true} -> | ||||
|                            offset; | ||||
|                        #{writer := true} -> | ||||
|                            writer | ||||
|                    end, | ||||
|     case rabbit_misc:rpc_call(NodeName, | ||||
|                          ?MODULE, | ||||
|                          tracking_info, | ||||
|                          [VHost, Stream, TrackingType], | ||||
|                          Timeout) of | ||||
|         {error, not_found} -> | ||||
|             {error, "The stream does not exist."}; | ||||
|         {error, not_available} -> | ||||
|             {error, "The stream is not available."}; | ||||
|         {error, _} = E -> | ||||
|             E; | ||||
|         R -> | ||||
|             R | ||||
|     end. | ||||
|      | ||||
| banner([Stream], _) -> | ||||
|     <<"Listing tracking information for stream ", Stream/binary, <<" ...">>/binary>>. | ||||
| 
 | ||||
| output({ok, []}, _Opts) -> | ||||
|     ok; | ||||
| output([], _Opts) -> | ||||
|     ok; | ||||
| output(Result, _Opts) -> | ||||
|     'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result). | ||||
| 
 | ||||
| tracking_info(VHost, Stream, TrackingType) -> | ||||
|     case rabbit_stream_manager:lookup_leader(VHost, Stream) of | ||||
|         {ok, Leader} -> | ||||
|             TrackingInfo = osiris:read_tracking(Leader), | ||||
|             FieldsLabels = case TrackingType of | ||||
|                                all -> | ||||
|                                    [{offsets, offset}, {sequences, writer}]; | ||||
|                                offset -> | ||||
|                                    [{offsets, offset}]; | ||||
|                                writer -> | ||||
|                                    [{sequences, writer}] | ||||
|                            end, | ||||
|             lists:foldl(fun({F, L}, Acc) -> | ||||
|                                 Tracking = maps:get(F, TrackingInfo, #{}), | ||||
|                                 maps:fold(fun(Reference, {_, Sequence}, AccType) -> | ||||
|                                                   [[{type, L}, | ||||
|                                                     {name, Reference}, | ||||
|                                                     {tracking_value, Sequence} | ||||
|                                                    ] | AccType]; | ||||
|                                              (Reference, Offset, AccType) -> | ||||
|                                                   [[{type, L}, | ||||
|                                                     {name, Reference}, | ||||
|                                                     {tracking_value, Offset} | ||||
|                                                    ] | AccType] | ||||
|                                           end, Acc, Tracking) | ||||
|                         end, [], FieldsLabels); | ||||
|         {error, _} = E -> | ||||
|             E | ||||
|     end. | ||||
|  | @ -31,6 +31,8 @@ | |||
|         'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConsumerGroupsCommand'). | ||||
| -define(COMMAND_LIST_GROUP_CONSUMERS, | ||||
|         'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamGroupConsumersCommand'). | ||||
| -define(COMMAND_LIST_STREAM_TRACKING, | ||||
|         'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamTrackingCommand'). | ||||
| 
 | ||||
| all() -> | ||||
|     [{group, list_connections}, | ||||
|  | @ -38,6 +40,7 @@ all() -> | |||
|      {group, list_publishers}, | ||||
|      {group, list_consumer_groups}, | ||||
|      {group, list_group_consumers}, | ||||
|      {group, list_stream_tracking}, | ||||
|      {group, super_streams}]. | ||||
| 
 | ||||
| groups() -> | ||||
|  | @ -49,10 +52,14 @@ groups() -> | |||
|      {list_publishers, [], | ||||
|       [list_publishers_merge_defaults, list_publishers_run]}, | ||||
|      {list_consumer_groups, [], | ||||
|       [list_consumer_groups_merge_defaults, list_consumer_groups_run]}, | ||||
|       [list_consumer_groups_validate, list_consumer_groups_merge_defaults, | ||||
|        list_consumer_groups_run]}, | ||||
|      {list_group_consumers, [], | ||||
|       [list_group_consumers_validate, list_group_consumers_merge_defaults, | ||||
|        list_group_consumers_run]}, | ||||
|      {list_stream_tracking, [], | ||||
|       [list_stream_tracking_validate, list_stream_tracking_merge_defaults, | ||||
|        list_stream_tracking_run]}, | ||||
|      {super_streams, [], | ||||
|       [add_super_stream_merge_defaults, | ||||
|        add_super_stream_validate, | ||||
|  | @ -332,6 +339,18 @@ list_publishers_run(Config) -> | |||
|     ?awaitMatch(0, publisher_count(Config), ?WAIT), | ||||
|     ok. | ||||
| 
 | ||||
| list_consumer_groups_validate(_) -> | ||||
|     ValidOpts = #{vhost => <<"/">>}, | ||||
|     ?assertMatch({validation_failure, {bad_info_key, [foo]}}, | ||||
|                  ?COMMAND_LIST_CONSUMER_GROUPS:validate([<<"foo">>], | ||||
|                                                         ValidOpts)), | ||||
|     ?assertMatch(ok, | ||||
|                  ?COMMAND_LIST_CONSUMER_GROUPS:validate([<<"reference">>], | ||||
|                                                         ValidOpts)), | ||||
|     ?assertMatch(ok, | ||||
|                  ?COMMAND_LIST_CONSUMER_GROUPS:validate([], ValidOpts)). | ||||
| 
 | ||||
| 
 | ||||
| list_consumer_groups_merge_defaults(_Config) -> | ||||
|     DefaultItems = | ||||
|         [rabbit_data_coercion:to_binary(Item) | ||||
|  | @ -521,6 +540,106 @@ assertConsumerGroup(S, R, PI, Cs, Record) -> | |||
|     ?assertEqual(Cs, proplists:get_value(consumers, Record)), | ||||
|     ok. | ||||
| 
 | ||||
| list_stream_tracking_validate(_) -> | ||||
|     ValidOpts = #{vhost => <<"/">>, <<"writer">> => true}, | ||||
|     ?assertMatch({validation_failure, not_enough_args}, | ||||
|                  ?COMMAND_LIST_STREAM_TRACKING:validate([], #{})), | ||||
|     ?assertMatch({validation_failure, not_enough_args}, | ||||
|                  ?COMMAND_LIST_STREAM_TRACKING:validate([], | ||||
|                                                         #{vhost => | ||||
|                                                               <<"test">>})), | ||||
|     ?assertMatch({validation_failure, "Specify only one of --all, --offset, --writer."}, | ||||
|                  ?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>], | ||||
|                                                         #{all => true, writer => true})), | ||||
|     ?assertMatch({validation_failure, too_many_args}, | ||||
|                  ?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>, <<"bad">>], | ||||
|                                                         ValidOpts)), | ||||
| 
 | ||||
|     ?assertMatch(ok, | ||||
|                  ?COMMAND_LIST_STREAM_TRACKING:validate([<<"stream">>], | ||||
|                                                         ValidOpts)). | ||||
| list_stream_tracking_merge_defaults(_Config) -> | ||||
|     ?assertMatch({[<<"s">>], #{all := true, vhost := <<"/">>}}, | ||||
|       ?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{})), | ||||
| 
 | ||||
|     ?assertMatch({[<<"s">>], #{writer := true, vhost := <<"/">>}}, | ||||
|       ?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{writer => true})), | ||||
| 
 | ||||
|     ?assertMatch({[<<"s">>], #{all := true, vhost := <<"dev">>}}, | ||||
|       ?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{vhost => <<"dev">>})), | ||||
| 
 | ||||
|     ?assertMatch({[<<"s">>], #{writer := true, vhost := <<"dev">>}}, | ||||
|       ?COMMAND_LIST_STREAM_TRACKING:merge_defaults([<<"s">>], #{writer => true, vhost => <<"dev">>})). | ||||
| 
 | ||||
| list_stream_tracking_run(Config) -> | ||||
|     Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), | ||||
|     Stream = <<"list_stream_tracking_run">>, | ||||
|     ConsumerReference = <<"foo">>, | ||||
|     PublisherReference = <<"bar">>, | ||||
|     Opts = | ||||
|         #{node => Node, | ||||
|           timeout => 10000, | ||||
|           vhost => <<"/">>}, | ||||
|     Args = [Stream], | ||||
| 
 | ||||
|     %% the stream does not exist yet | ||||
|     ?assertMatch({error, "The stream does not exist."}, | ||||
|                  ?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})), | ||||
| 
 | ||||
|     StreamPort = rabbit_stream_SUITE:get_stream_port(Config), | ||||
|     {S, C} = start_stream_connection(StreamPort), | ||||
|     ?awaitMatch(1, connection_count(Config), ?WAIT), | ||||
| 
 | ||||
|     create_stream(S, Stream, C), | ||||
| 
 | ||||
|     ?assertMatch([], | ||||
|                  ?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})), | ||||
| 
 | ||||
|     store_offset(S, Stream, ConsumerReference, 42, C), | ||||
| 
 | ||||
|     ?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 42}]], | ||||
|                 ?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})), | ||||
| 
 | ||||
|     ?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 42}]], | ||||
|                 ?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{offset => true})), | ||||
| 
 | ||||
|     ok = store_offset(S, Stream, ConsumerReference, 55, C), | ||||
|     ?assertMatch([[{type,offset}, {name, ConsumerReference}, {tracking_value, 55}]], | ||||
|                 ?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{offset => true})), | ||||
| 
 | ||||
| 
 | ||||
|     PublisherId = 1, | ||||
|     rabbit_stream_SUITE:test_declare_publisher(gen_tcp, S, PublisherId, | ||||
|                                                PublisherReference, Stream, C), | ||||
|     rabbit_stream_SUITE:test_publish_confirm(gen_tcp, S, PublisherId, 42, <<"">>, C), | ||||
| 
 | ||||
|     ok = check_publisher_sequence(S, Stream, PublisherReference, 42, C), | ||||
| 
 | ||||
|     ?assertMatch([ | ||||
|                   [{type,writer},{name,<<"bar">>},{tracking_value, 42}], | ||||
|                   [{type,offset},{name,<<"foo">>},{tracking_value, 55}] | ||||
|                  ], | ||||
|                  ?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{all => true})), | ||||
| 
 | ||||
|     ?assertMatch([ | ||||
|                   [{type,writer},{name,<<"bar">>},{tracking_value, 42}] | ||||
|                  ], | ||||
|                  ?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{writer => true})), | ||||
| 
 | ||||
|     rabbit_stream_SUITE:test_publish_confirm(gen_tcp, S, PublisherId, 66, <<"">>, C), | ||||
| 
 | ||||
|     ok = check_publisher_sequence(S, Stream, PublisherReference, 66, C), | ||||
| 
 | ||||
|     ?assertMatch([ | ||||
|                   [{type,writer},{name,<<"bar">>},{tracking_value, 66}] | ||||
|                  ], | ||||
|                  ?COMMAND_LIST_STREAM_TRACKING:run(Args, Opts#{writer => true})), | ||||
| 
 | ||||
|     delete_stream(S, Stream, C), | ||||
| 
 | ||||
|     close(S, C), | ||||
|     ok. | ||||
| 
 | ||||
| add_super_stream_merge_defaults(_Config) -> | ||||
|     ?assertMatch({[<<"super-stream">>], | ||||
|                   #{partitions := 3, vhost := <<"/">>}}, | ||||
|  | @ -714,6 +833,9 @@ declare_publisher(S, PubId, Stream, C) -> | |||
| delete_stream(S, Stream, C) -> | ||||
|     rabbit_stream_SUITE:test_delete_stream(gen_tcp, S, Stream, C). | ||||
| 
 | ||||
| delete_stream_no_metadata_update(S, Stream, C) -> | ||||
|     rabbit_stream_SUITE:test_delete_stream(gen_tcp, S, Stream, C, false). | ||||
| 
 | ||||
| metadata_update_stream_deleted(S, Stream, C) -> | ||||
|     rabbit_stream_SUITE:test_metadata_update_stream_deleted(gen_tcp, | ||||
|                                                             S, | ||||
|  | @ -798,3 +920,52 @@ queue_lookup(Config, Q) -> | |||
|                                  rabbit_amqqueue, | ||||
|                                  lookup, | ||||
|                                  [QueueName]). | ||||
| 
 | ||||
| store_offset(S, Stream, Reference, Value, C) -> | ||||
|     StoreOffsetFrame = | ||||
|         rabbit_stream_core:frame({store_offset, Reference, Stream, Value}), | ||||
|     ok = gen_tcp:send(S, StoreOffsetFrame), | ||||
|     case check_stored_offset(S, Stream, Reference, Value, C, 20) of | ||||
|         ok -> | ||||
|             ok; | ||||
|         _ -> | ||||
|             {error, offset_not_stored} | ||||
|     end. | ||||
| 
 | ||||
| check_stored_offset(_, _, _, _, _, 0) -> | ||||
|     error; | ||||
| check_stored_offset(S, Stream, Reference, Expected, C, Attempt) -> | ||||
|     QueryOffsetFrame = | ||||
|         rabbit_stream_core:frame({request, 1, {query_offset, Reference, Stream}}), | ||||
|     ok = gen_tcp:send(S, QueryOffsetFrame), | ||||
|     {Cmd, _} = rabbit_stream_SUITE:receive_commands(gen_tcp, S, C), | ||||
|     ?assertMatch({response, 1, {query_offset, ?RESPONSE_CODE_OK, _}}, Cmd), | ||||
|     {response, 1, {query_offset, ?RESPONSE_CODE_OK, StoredValue}} = Cmd, | ||||
|     case StoredValue of | ||||
|         Expected -> | ||||
|             ok; | ||||
|         _ -> | ||||
|             timer:sleep(50), | ||||
|             check_stored_offset(S, Stream, Reference, Expected, C, Attempt - 1) | ||||
|     end. | ||||
| 
 | ||||
| check_publisher_sequence(S, Stream, Reference, Expected, C) -> | ||||
|     check_publisher_sequence(S, Stream, Reference, Expected, C, 20). | ||||
| 
 | ||||
| check_publisher_sequence(_, _, _, _, _, 0) -> | ||||
|     error; | ||||
| check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt) -> | ||||
|     QueryFrame = | ||||
|         rabbit_stream_core:frame({request, 1, {query_publisher_sequence, Reference, Stream}}), | ||||
|     ok = gen_tcp:send(S, QueryFrame), | ||||
|     {Cmd, _} = rabbit_stream_SUITE:receive_commands(gen_tcp, S, C), | ||||
|     ?assertMatch({response, 1, {query_publisher_sequence, _, _}}, Cmd), | ||||
|     {response, 1, {query_publisher_sequence, _, StoredValue}} = Cmd, | ||||
|     case StoredValue of | ||||
|         Expected -> | ||||
|             ok; | ||||
|         _ -> | ||||
|             timer:sleep(50), | ||||
|             check_publisher_sequence(S, Stream, Reference, Expected, C, Attempt - 1) | ||||
|     end. | ||||
| 
 | ||||
|  |  | |||
|  | @ -724,11 +724,14 @@ test_metadata_update_stream_deleted(Transport, S, Stream, C0) -> | |||
|     C1. | ||||
| 
 | ||||
| test_declare_publisher(Transport, S, PublisherId, Stream, C0) -> | ||||
|     test_declare_publisher(Transport, S, PublisherId, <<>>, Stream, C0). | ||||
| 
 | ||||
| test_declare_publisher(Transport, S, PublisherId, Reference, Stream, C0) -> | ||||
|     DeclarePublisherFrame = | ||||
|         rabbit_stream_core:frame({request, 1, | ||||
|                                   {declare_publisher, | ||||
|                                    PublisherId, | ||||
|                                    <<>>, | ||||
|                                    Reference, | ||||
|                                    Stream}}), | ||||
|     ok = Transport:send(S, DeclarePublisherFrame), | ||||
|     {Cmd, C} = receive_commands(Transport, S, C0), | ||||
|  | @ -737,25 +740,35 @@ test_declare_publisher(Transport, S, PublisherId, Stream, C0) -> | |||
|     C. | ||||
| 
 | ||||
| test_publish_confirm(Transport, S, PublisherId, Body, C0) -> | ||||
|     test_publish_confirm(Transport, S, publish, PublisherId, Body, | ||||
|     test_publish_confirm(Transport, S, PublisherId, 1, Body, C0). | ||||
| 
 | ||||
| test_publish_confirm(Transport, S, PublisherId, Sequence, Body, C0) -> | ||||
|     test_publish_confirm(Transport, S, publish, PublisherId, Sequence, Body, | ||||
|                          publish_confirm, C0). | ||||
| 
 | ||||
| test_publish_confirm(Transport, S, publish = PublishCmd, PublisherId, Body, | ||||
| test_publish_confirm(Transport, S, PublishCmd, PublisherId, Body, | ||||
|                      ExpectedConfirmCommand, C0) -> | ||||
|     test_publish_confirm(Transport, S, PublishCmd, PublisherId, 1, Body, | ||||
|                          ExpectedConfirmCommand, C0). | ||||
| 
 | ||||
| test_publish_confirm(Transport, S, publish = PublishCmd, PublisherId, | ||||
|                      Sequence, Body, | ||||
|                      ExpectedConfirmCommand, C0) -> | ||||
|     BodySize = byte_size(Body), | ||||
|     Messages = [<<1:64, 0:1, BodySize:31, Body:BodySize/binary>>], | ||||
|     Messages = [<<Sequence:64, 0:1, BodySize:31, Body:BodySize/binary>>], | ||||
|     PublishFrame = | ||||
|         rabbit_stream_core:frame({PublishCmd, PublisherId, 1, Messages}), | ||||
|     ok = Transport:send(S, PublishFrame), | ||||
|     {Cmd, C} = receive_commands(Transport, S, C0), | ||||
|     ?assertMatch({ExpectedConfirmCommand, PublisherId, [1]}, Cmd), | ||||
|     ?assertMatch({ExpectedConfirmCommand, PublisherId, [Sequence]}, Cmd), | ||||
|     C; | ||||
| test_publish_confirm(Transport, S, publish_v2 = PublishCmd, PublisherId, Body, | ||||
| test_publish_confirm(Transport, S, publish_v2 = PublishCmd, PublisherId, | ||||
|                      Sequence, Body, | ||||
|                      ExpectedConfirmCommand, C0) -> | ||||
|     BodySize = byte_size(Body), | ||||
|     FilterValue = <<"foo">>, | ||||
|     FilterValueSize = byte_size(FilterValue), | ||||
|     Messages = [<<1:64, FilterValueSize:16, FilterValue:FilterValueSize/binary, | ||||
|     Messages = [<<Sequence:64, FilterValueSize:16, FilterValue:FilterValueSize/binary, | ||||
|                   0:1, BodySize:31, Body:BodySize/binary>>], | ||||
|     PublishFrame = | ||||
|         rabbit_stream_core:frame({PublishCmd, PublisherId, 1, Messages}), | ||||
|  | @ -763,9 +776,9 @@ test_publish_confirm(Transport, S, publish_v2 = PublishCmd, PublisherId, Body, | |||
|     {Cmd, C} = receive_commands(Transport, S, C0), | ||||
|     case ExpectedConfirmCommand of | ||||
|         publish_confirm -> | ||||
|             ?assertMatch({ExpectedConfirmCommand, PublisherId, [1]}, Cmd); | ||||
|             ?assertMatch({ExpectedConfirmCommand, PublisherId, [Sequence]}, Cmd); | ||||
|         publish_error -> | ||||
|             ?assertMatch({ExpectedConfirmCommand, PublisherId, _, [1]}, Cmd) | ||||
|             ?assertMatch({ExpectedConfirmCommand, PublisherId, _, [Sequence]}, Cmd) | ||||
|     end, | ||||
|     C. | ||||
| 
 | ||||
|  |  | |||
|  | @ -18,6 +18,7 @@ def all_beam_files(name = "all_beam_files"): | |||
|             "src/rabbit_stream_management_utils.erl", | ||||
|             "src/rabbit_stream_mgmt_db.erl", | ||||
|             "src/rabbit_stream_publishers_mgmt.erl", | ||||
|             "src/rabbit_stream_tracking_mgmt.erl", | ||||
|         ], | ||||
|         hdrs = [":public_and_private_hdrs"], | ||||
|         app_name = "rabbitmq_stream_management", | ||||
|  | @ -51,6 +52,7 @@ def all_test_beam_files(name = "all_test_beam_files"): | |||
|             "src/rabbit_stream_management_utils.erl", | ||||
|             "src/rabbit_stream_mgmt_db.erl", | ||||
|             "src/rabbit_stream_publishers_mgmt.erl", | ||||
|             "src/rabbit_stream_tracking_mgmt.erl", | ||||
|         ], | ||||
|         hdrs = [":public_and_private_hdrs"], | ||||
|         app_name = "rabbitmq_stream_management", | ||||
|  | @ -101,6 +103,7 @@ def all_srcs(name = "all_srcs"): | |||
|             "src/rabbit_stream_management_utils.erl", | ||||
|             "src/rabbit_stream_mgmt_db.erl", | ||||
|             "src/rabbit_stream_publishers_mgmt.erl", | ||||
|             "src/rabbit_stream_tracking_mgmt.erl", | ||||
|         ], | ||||
|     ) | ||||
|     filegroup( | ||||
|  |  | |||
|  | @ -0,0 +1,102 @@ | |||
| %% This Source Code Form is subject to the terms of the Mozilla Public | ||||
| %% License, v. 2.0. If a copy of the MPL was not distributed with this | ||||
| %% file, You can obtain one at https://mozilla.org/MPL/2.0/. | ||||
| %% | ||||
| %% Copyright (c) 2023 VMware, Inc. or its affiliates.  All rights reserved. | ||||
| %% | ||||
| 
 | ||||
| -module(rabbit_stream_tracking_mgmt). | ||||
| 
 | ||||
| -behaviour(rabbit_mgmt_extension). | ||||
| 
 | ||||
| -export([dispatcher/0, | ||||
|          web_ui/0]). | ||||
| -export([init/2, | ||||
|          resource_exists/2, | ||||
|          to_json/2, | ||||
|          content_types_provided/2, | ||||
|          is_authorized/2]). | ||||
| 
 | ||||
| -include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl"). | ||||
| -include_lib("rabbit_common/include/rabbit.hrl"). | ||||
| 
 | ||||
| dispatcher() -> | ||||
|     [{"/stream/:vhost/:queue/tracking", ?MODULE, []}]. | ||||
| 
 | ||||
| web_ui() -> | ||||
|     []. | ||||
| 
 | ||||
| %%-------------------------------------------------------------------- | ||||
| 
 | ||||
| init(Req, _Opts) -> | ||||
|     {cowboy_rest, rabbit_mgmt_cors:set_headers(Req, ?MODULE), #context{}}. | ||||
| 
 | ||||
| content_types_provided(ReqData, Context) -> | ||||
|     {[{<<"application/json">>, to_json}], ReqData, Context}. | ||||
| 
 | ||||
| resource_exists(ReqData, Context) -> | ||||
|     {case rabbit_mgmt_util:vhost(ReqData) of | ||||
|          not_found -> | ||||
|              false; | ||||
|          none -> | ||||
|              false; % none means `all` | ||||
|          _ -> | ||||
|              case rabbit_mgmt_util:id(queue, ReqData) of | ||||
|                  none -> | ||||
|                      false; | ||||
|                  _ -> | ||||
|                      case rabbit_mgmt_wm_queue:queue(ReqData) of | ||||
|                          not_found -> | ||||
|                              false; | ||||
|                          _ -> | ||||
|                              true | ||||
|                      end | ||||
|              end | ||||
|      end, | ||||
|      ReqData, Context}. | ||||
| 
 | ||||
| to_json(ReqData, Context) -> | ||||
|     case rabbit_mgmt_util:disable_stats(ReqData) of | ||||
|         false -> | ||||
|             VHost = rabbit_mgmt_util:vhost(ReqData), | ||||
|             Stream = rabbit_mgmt_util:id(queue, ReqData), | ||||
|             case rabbit_stream_manager:lookup_leader(VHost, Stream) of | ||||
|                 {ok, Leader} -> | ||||
|                     Type = tracking_type(rabbit_mgmt_util:get_value_param(<<"type">>, ReqData)), | ||||
|                     TrackingInfo = maps:remove(timestamps, osiris:read_tracking(Leader)), | ||||
|                     rabbit_mgmt_util:reply(transform_tracking(Type, TrackingInfo), | ||||
|                                            ReqData, | ||||
|                                            Context); | ||||
|                 {error, _} -> | ||||
|                     rabbit_mgmt_util:service_unavailable(<<"The stream leader is not available">>, | ||||
|                                                          ReqData, Context) | ||||
|             end; | ||||
|         true -> | ||||
|             rabbit_mgmt_util:bad_request(<<"Stats in management UI are disabled on this node">>, | ||||
|                                          ReqData, Context) | ||||
|     end. | ||||
| 
 | ||||
| tracking_type(undefined) -> | ||||
|     all; | ||||
| tracking_type("offset") -> | ||||
|     offset; | ||||
| tracking_type("writer") -> | ||||
|     writer; | ||||
| tracking_type(_) -> | ||||
|     all. | ||||
| 
 | ||||
| transform_tracking(offset, Tracking) -> | ||||
|     maps:remove(sequences, Tracking); | ||||
| transform_tracking(writer, Tracking) -> | ||||
|     #{writers => convert_writer_tracking(maps:get(sequences, Tracking))}; | ||||
| transform_tracking(all, Tracking) -> | ||||
|     #{offsets => maps:get(offsets, Tracking), | ||||
|       writers => convert_writer_tracking(maps:get(sequences, Tracking))}. | ||||
| 
 | ||||
| convert_writer_tracking(Writers) -> | ||||
|     maps:fold(fun(Ref, {_, Seq}, Acc) -> | ||||
|                       Acc#{Ref => Seq}     | ||||
|               end, #{}, Writers). | ||||
| 
 | ||||
| is_authorized(ReqData, Context) -> | ||||
|     rabbit_mgmt_util:is_authorized(ReqData, Context). | ||||
|  | @ -1,117 +0,0 @@ | |||
| /* | ||||
|  * Copyright 2007-present the original author or authors. | ||||
|  * | ||||
|  * Licensed under the Apache License, Version 2.0 (the "License"); | ||||
|  * you may not use this file except in compliance with the License. | ||||
|  * You may obtain a copy of the License at | ||||
|  * | ||||
|  *      http://www.apache.org/licenses/LICENSE-2.0 | ||||
|  * | ||||
|  * Unless required by applicable law or agreed to in writing, software | ||||
|  * distributed under the License is distributed on an "AS IS" BASIS, | ||||
|  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||||
|  * See the License for the specific language governing permissions and | ||||
|  * limitations under the License. | ||||
|  */ | ||||
| import java.net.*; | ||||
| import java.io.*; | ||||
| import java.nio.channels.*; | ||||
| import java.util.Properties; | ||||
| 
 | ||||
| public class MavenWrapperDownloader { | ||||
| 
 | ||||
|     private static final String WRAPPER_VERSION = "0.5.6"; | ||||
|     /** | ||||
|      * Default URL to download the maven-wrapper.jar from, if no 'downloadUrl' is provided. | ||||
|      */ | ||||
|     private static final String DEFAULT_DOWNLOAD_URL = "https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/" | ||||
|         + WRAPPER_VERSION + "/maven-wrapper-" + WRAPPER_VERSION + ".jar"; | ||||
| 
 | ||||
|     /** | ||||
|      * Path to the maven-wrapper.properties file, which might contain a downloadUrl property to | ||||
|      * use instead of the default one. | ||||
|      */ | ||||
|     private static final String MAVEN_WRAPPER_PROPERTIES_PATH = | ||||
|             ".mvn/wrapper/maven-wrapper.properties"; | ||||
| 
 | ||||
|     /** | ||||
|      * Path where the maven-wrapper.jar will be saved to. | ||||
|      */ | ||||
|     private static final String MAVEN_WRAPPER_JAR_PATH = | ||||
|             ".mvn/wrapper/maven-wrapper.jar"; | ||||
| 
 | ||||
|     /** | ||||
|      * Name of the property which should be used to override the default download url for the wrapper. | ||||
|      */ | ||||
|     private static final String PROPERTY_NAME_WRAPPER_URL = "wrapperUrl"; | ||||
| 
 | ||||
|     public static void main(String args[]) { | ||||
|         System.out.println("- Downloader started"); | ||||
|         File baseDirectory = new File(args[0]); | ||||
|         System.out.println("- Using base directory: " + baseDirectory.getAbsolutePath()); | ||||
| 
 | ||||
|         // If the maven-wrapper.properties exists, read it and check if it contains a custom | ||||
|         // wrapperUrl parameter. | ||||
|         File mavenWrapperPropertyFile = new File(baseDirectory, MAVEN_WRAPPER_PROPERTIES_PATH); | ||||
|         String url = DEFAULT_DOWNLOAD_URL; | ||||
|         if(mavenWrapperPropertyFile.exists()) { | ||||
|             FileInputStream mavenWrapperPropertyFileInputStream = null; | ||||
|             try { | ||||
|                 mavenWrapperPropertyFileInputStream = new FileInputStream(mavenWrapperPropertyFile); | ||||
|                 Properties mavenWrapperProperties = new Properties(); | ||||
|                 mavenWrapperProperties.load(mavenWrapperPropertyFileInputStream); | ||||
|                 url = mavenWrapperProperties.getProperty(PROPERTY_NAME_WRAPPER_URL, url); | ||||
|             } catch (IOException e) { | ||||
|                 System.out.println("- ERROR loading '" + MAVEN_WRAPPER_PROPERTIES_PATH + "'"); | ||||
|             } finally { | ||||
|                 try { | ||||
|                     if(mavenWrapperPropertyFileInputStream != null) { | ||||
|                         mavenWrapperPropertyFileInputStream.close(); | ||||
|                     } | ||||
|                 } catch (IOException e) { | ||||
|                     // Ignore ... | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         System.out.println("- Downloading from: " + url); | ||||
| 
 | ||||
|         File outputFile = new File(baseDirectory.getAbsolutePath(), MAVEN_WRAPPER_JAR_PATH); | ||||
|         if(!outputFile.getParentFile().exists()) { | ||||
|             if(!outputFile.getParentFile().mkdirs()) { | ||||
|                 System.out.println( | ||||
|                         "- ERROR creating output directory '" + outputFile.getParentFile().getAbsolutePath() + "'"); | ||||
|             } | ||||
|         } | ||||
|         System.out.println("- Downloading to: " + outputFile.getAbsolutePath()); | ||||
|         try { | ||||
|             downloadFileFromURL(url, outputFile); | ||||
|             System.out.println("Done"); | ||||
|             System.exit(0); | ||||
|         } catch (Throwable e) { | ||||
|             System.out.println("- Error downloading"); | ||||
|             e.printStackTrace(); | ||||
|             System.exit(1); | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     private static void downloadFileFromURL(String urlString, File destination) throws Exception { | ||||
|         if (System.getenv("MVNW_USERNAME") != null && System.getenv("MVNW_PASSWORD") != null) { | ||||
|             String username = System.getenv("MVNW_USERNAME"); | ||||
|             char[] password = System.getenv("MVNW_PASSWORD").toCharArray(); | ||||
|             Authenticator.setDefault(new Authenticator() { | ||||
|                 @Override | ||||
|                 protected PasswordAuthentication getPasswordAuthentication() { | ||||
|                     return new PasswordAuthentication(username, password); | ||||
|                 } | ||||
|             }); | ||||
|         } | ||||
|         URL website = new URL(urlString); | ||||
|         ReadableByteChannel rbc; | ||||
|         rbc = Channels.newChannel(website.openStream()); | ||||
|         FileOutputStream fos = new FileOutputStream(destination); | ||||
|         fos.getChannel().transferFrom(rbc, 0, Long.MAX_VALUE); | ||||
|         fos.close(); | ||||
|         rbc.close(); | ||||
|     } | ||||
| 
 | ||||
| } | ||||
										
											Binary file not shown.
										
									
								
							|  | @ -1,2 +1,18 @@ | |||
| distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.6.3/apache-maven-3.6.3-bin.zip | ||||
| wrapperUrl=https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar | ||||
| # Licensed to the Apache Software Foundation (ASF) under one | ||||
| # or more contributor license agreements.  See the NOTICE file | ||||
| # distributed with this work for additional information | ||||
| # regarding copyright ownership.  The ASF licenses this file | ||||
| # to you under the Apache License, Version 2.0 (the | ||||
| # "License"); you may not use this file except in compliance | ||||
| # with the License.  You may obtain a copy of the License at | ||||
| # | ||||
| #   http://www.apache.org/licenses/LICENSE-2.0 | ||||
| # | ||||
| # Unless required by applicable law or agreed to in writing, | ||||
| # software distributed under the License is distributed on an | ||||
| # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY | ||||
| # KIND, either express or implied.  See the License for the | ||||
| # specific language governing permissions and limitations | ||||
| # under the License. | ||||
| distributionUrl=https://repo.maven.apache.org/maven2/org/apache/maven/apache-maven/3.9.5/apache-maven-3.9.5-bin.zip | ||||
| wrapperUrl=https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar | ||||
|  |  | |||
|  | @ -19,7 +19,7 @@ | |||
| # ---------------------------------------------------------------------------- | ||||
| 
 | ||||
| # ---------------------------------------------------------------------------- | ||||
| # Maven Start Up Batch script | ||||
| # Apache Maven Wrapper startup batch script, version 3.2.0 | ||||
| # | ||||
| # Required ENV vars: | ||||
| # ------------------ | ||||
|  | @ -27,7 +27,6 @@ | |||
| # | ||||
| # Optional ENV vars | ||||
| # ----------------- | ||||
| #   M2_HOME - location of maven2's installed home dir | ||||
| #   MAVEN_OPTS - parameters passed to the Java VM when running Maven | ||||
| #     e.g. to debug Maven itself, use | ||||
| #       set MAVEN_OPTS=-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=8000 | ||||
|  | @ -36,6 +35,10 @@ | |||
| 
 | ||||
| if [ -z "$MAVEN_SKIP_RC" ] ; then | ||||
| 
 | ||||
|   if [ -f /usr/local/etc/mavenrc ] ; then | ||||
|     . /usr/local/etc/mavenrc | ||||
|   fi | ||||
| 
 | ||||
|   if [ -f /etc/mavenrc ] ; then | ||||
|     . /etc/mavenrc | ||||
|   fi | ||||
|  | @ -50,7 +53,7 @@ fi | |||
| cygwin=false; | ||||
| darwin=false; | ||||
| mingw=false | ||||
| case "`uname`" in | ||||
| case "$(uname)" in | ||||
|   CYGWIN*) cygwin=true ;; | ||||
|   MINGW*) mingw=true;; | ||||
|   Darwin*) darwin=true | ||||
|  | @ -58,9 +61,9 @@ case "`uname`" in | |||
|     # See https://developer.apple.com/library/mac/qa/qa1170/_index.html | ||||
|     if [ -z "$JAVA_HOME" ]; then | ||||
|       if [ -x "/usr/libexec/java_home" ]; then | ||||
|         export JAVA_HOME="`/usr/libexec/java_home`" | ||||
|         JAVA_HOME="$(/usr/libexec/java_home)"; export JAVA_HOME | ||||
|       else | ||||
|         export JAVA_HOME="/Library/Java/Home" | ||||
|         JAVA_HOME="/Library/Java/Home"; export JAVA_HOME | ||||
|       fi | ||||
|     fi | ||||
|     ;; | ||||
|  | @ -68,68 +71,38 @@ esac | |||
| 
 | ||||
| if [ -z "$JAVA_HOME" ] ; then | ||||
|   if [ -r /etc/gentoo-release ] ; then | ||||
|     JAVA_HOME=`java-config --jre-home` | ||||
|     JAVA_HOME=$(java-config --jre-home) | ||||
|   fi | ||||
| fi | ||||
| 
 | ||||
| if [ -z "$M2_HOME" ] ; then | ||||
|   ## resolve links - $0 may be a link to maven's home | ||||
|   PRG="$0" | ||||
| 
 | ||||
|   # need this for relative symlinks | ||||
|   while [ -h "$PRG" ] ; do | ||||
|     ls=`ls -ld "$PRG"` | ||||
|     link=`expr "$ls" : '.*-> \(.*\)$'` | ||||
|     if expr "$link" : '/.*' > /dev/null; then | ||||
|       PRG="$link" | ||||
|     else | ||||
|       PRG="`dirname "$PRG"`/$link" | ||||
|     fi | ||||
|   done | ||||
| 
 | ||||
|   saveddir=`pwd` | ||||
| 
 | ||||
|   M2_HOME=`dirname "$PRG"`/.. | ||||
| 
 | ||||
|   # make it fully qualified | ||||
|   M2_HOME=`cd "$M2_HOME" && pwd` | ||||
| 
 | ||||
|   cd "$saveddir" | ||||
|   # echo Using m2 at $M2_HOME | ||||
| fi | ||||
| 
 | ||||
| # For Cygwin, ensure paths are in UNIX format before anything is touched | ||||
| if $cygwin ; then | ||||
|   [ -n "$M2_HOME" ] && | ||||
|     M2_HOME=`cygpath --unix "$M2_HOME"` | ||||
|   [ -n "$JAVA_HOME" ] && | ||||
|     JAVA_HOME=`cygpath --unix "$JAVA_HOME"` | ||||
|     JAVA_HOME=$(cygpath --unix "$JAVA_HOME") | ||||
|   [ -n "$CLASSPATH" ] && | ||||
|     CLASSPATH=`cygpath --path --unix "$CLASSPATH"` | ||||
|     CLASSPATH=$(cygpath --path --unix "$CLASSPATH") | ||||
| fi | ||||
| 
 | ||||
| # For Mingw, ensure paths are in UNIX format before anything is touched | ||||
| if $mingw ; then | ||||
|   [ -n "$M2_HOME" ] && | ||||
|     M2_HOME="`(cd "$M2_HOME"; pwd)`" | ||||
|   [ -n "$JAVA_HOME" ] && | ||||
|     JAVA_HOME="`(cd "$JAVA_HOME"; pwd)`" | ||||
|   [ -n "$JAVA_HOME" ] && [ -d "$JAVA_HOME" ] && | ||||
|     JAVA_HOME="$(cd "$JAVA_HOME" || (echo "cannot cd into $JAVA_HOME."; exit 1); pwd)" | ||||
| fi | ||||
| 
 | ||||
| if [ -z "$JAVA_HOME" ]; then | ||||
|   javaExecutable="`which javac`" | ||||
|   if [ -n "$javaExecutable" ] && ! [ "`expr \"$javaExecutable\" : '\([^ ]*\)'`" = "no" ]; then | ||||
|   javaExecutable="$(which javac)" | ||||
|   if [ -n "$javaExecutable" ] && ! [ "$(expr "\"$javaExecutable\"" : '\([^ ]*\)')" = "no" ]; then | ||||
|     # readlink(1) is not available as standard on Solaris 10. | ||||
|     readLink=`which readlink` | ||||
|     if [ ! `expr "$readLink" : '\([^ ]*\)'` = "no" ]; then | ||||
|     readLink=$(which readlink) | ||||
|     if [ ! "$(expr "$readLink" : '\([^ ]*\)')" = "no" ]; then | ||||
|       if $darwin ; then | ||||
|         javaHome="`dirname \"$javaExecutable\"`" | ||||
|         javaExecutable="`cd \"$javaHome\" && pwd -P`/javac" | ||||
|         javaHome="$(dirname "\"$javaExecutable\"")" | ||||
|         javaExecutable="$(cd "\"$javaHome\"" && pwd -P)/javac" | ||||
|       else | ||||
|         javaExecutable="`readlink -f \"$javaExecutable\"`" | ||||
|         javaExecutable="$(readlink -f "\"$javaExecutable\"")" | ||||
|       fi | ||||
|       javaHome="`dirname \"$javaExecutable\"`" | ||||
|       javaHome=`expr "$javaHome" : '\(.*\)/bin'` | ||||
|       javaHome="$(dirname "\"$javaExecutable\"")" | ||||
|       javaHome=$(expr "$javaHome" : '\(.*\)/bin') | ||||
|       JAVA_HOME="$javaHome" | ||||
|       export JAVA_HOME | ||||
|     fi | ||||
|  | @ -145,7 +118,7 @@ if [ -z "$JAVACMD" ] ; then | |||
|       JAVACMD="$JAVA_HOME/bin/java" | ||||
|     fi | ||||
|   else | ||||
|     JAVACMD="`which java`" | ||||
|     JAVACMD="$(\unset -f command 2>/dev/null; \command -v java)" | ||||
|   fi | ||||
| fi | ||||
| 
 | ||||
|  | @ -159,12 +132,9 @@ if [ -z "$JAVA_HOME" ] ; then | |||
|   echo "Warning: JAVA_HOME environment variable is not set." | ||||
| fi | ||||
| 
 | ||||
| CLASSWORLDS_LAUNCHER=org.codehaus.plexus.classworlds.launcher.Launcher | ||||
| 
 | ||||
| # traverses directory structure from process work directory to filesystem root | ||||
| # first directory with .mvn subdirectory is considered project base directory | ||||
| find_maven_basedir() { | ||||
| 
 | ||||
|   if [ -z "$1" ] | ||||
|   then | ||||
|     echo "Path not specified to find_maven_basedir" | ||||
|  | @ -180,96 +150,99 @@ find_maven_basedir() { | |||
|     fi | ||||
|     # workaround for JBEAP-8937 (on Solaris 10/Sparc) | ||||
|     if [ -d "${wdir}" ]; then | ||||
|       wdir=`cd "$wdir/.."; pwd` | ||||
|       wdir=$(cd "$wdir/.." || exit 1; pwd) | ||||
|     fi | ||||
|     # end of workaround | ||||
|   done | ||||
|   echo "${basedir}" | ||||
|   printf '%s' "$(cd "$basedir" || exit 1; pwd)" | ||||
| } | ||||
| 
 | ||||
| # concatenates all lines of a file | ||||
| concat_lines() { | ||||
|   if [ -f "$1" ]; then | ||||
|     echo "$(tr -s '\n' ' ' < "$1")" | ||||
|     # Remove \r in case we run on Windows within Git Bash | ||||
|     # and check out the repository with auto CRLF management | ||||
|     # enabled. Otherwise, we may read lines that are delimited with | ||||
|     # \r\n and produce $'-Xarg\r' rather than -Xarg due to word | ||||
|     # splitting rules. | ||||
|     tr -s '\r\n' ' ' < "$1" | ||||
|   fi | ||||
| } | ||||
| 
 | ||||
| BASE_DIR=`find_maven_basedir "$(pwd)"` | ||||
| log() { | ||||
|   if [ "$MVNW_VERBOSE" = true ]; then | ||||
|     printf '%s\n' "$1" | ||||
|   fi | ||||
| } | ||||
| 
 | ||||
| BASE_DIR=$(find_maven_basedir "$(dirname "$0")") | ||||
| if [ -z "$BASE_DIR" ]; then | ||||
|   exit 1; | ||||
| fi | ||||
| 
 | ||||
| MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"}; export MAVEN_PROJECTBASEDIR | ||||
| log "$MAVEN_PROJECTBASEDIR" | ||||
| 
 | ||||
| ########################################################################################## | ||||
| # Extension to allow automatically downloading the maven-wrapper.jar from Maven-central | ||||
| # This allows using the maven wrapper in projects that prohibit checking in binary data. | ||||
| ########################################################################################## | ||||
| if [ -r "$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" ]; then | ||||
|     if [ "$MVNW_VERBOSE" = true ]; then | ||||
|       echo "Found .mvn/wrapper/maven-wrapper.jar" | ||||
|     fi | ||||
| wrapperJarPath="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" | ||||
| if [ -r "$wrapperJarPath" ]; then | ||||
|     log "Found $wrapperJarPath" | ||||
| else | ||||
|     if [ "$MVNW_VERBOSE" = true ]; then | ||||
|       echo "Couldn't find .mvn/wrapper/maven-wrapper.jar, downloading it ..." | ||||
|     fi | ||||
|     log "Couldn't find $wrapperJarPath, downloading it ..." | ||||
| 
 | ||||
|     if [ -n "$MVNW_REPOURL" ]; then | ||||
|       jarUrl="$MVNW_REPOURL/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" | ||||
|       wrapperUrl="$MVNW_REPOURL/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" | ||||
|     else | ||||
|       jarUrl="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" | ||||
|       wrapperUrl="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" | ||||
|     fi | ||||
|     while IFS="=" read key value; do | ||||
|       case "$key" in (wrapperUrl) jarUrl="$value"; break ;; | ||||
|     while IFS="=" read -r key value; do | ||||
|       # Remove '\r' from value to allow usage on windows as IFS does not consider '\r' as a separator ( considers space, tab, new line ('\n'), and custom '=' ) | ||||
|       safeValue=$(echo "$value" | tr -d '\r') | ||||
|       case "$key" in (wrapperUrl) wrapperUrl="$safeValue"; break ;; | ||||
|       esac | ||||
|     done < "$BASE_DIR/.mvn/wrapper/maven-wrapper.properties" | ||||
|     if [ "$MVNW_VERBOSE" = true ]; then | ||||
|       echo "Downloading from: $jarUrl" | ||||
|     fi | ||||
|     wrapperJarPath="$BASE_DIR/.mvn/wrapper/maven-wrapper.jar" | ||||
|     done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties" | ||||
|     log "Downloading from: $wrapperUrl" | ||||
| 
 | ||||
|     if $cygwin; then | ||||
|       wrapperJarPath=`cygpath --path --windows "$wrapperJarPath"` | ||||
|       wrapperJarPath=$(cygpath --path --windows "$wrapperJarPath") | ||||
|     fi | ||||
| 
 | ||||
|     if command -v wget > /dev/null; then | ||||
|         if [ "$MVNW_VERBOSE" = true ]; then | ||||
|           echo "Found wget ... using wget" | ||||
|         fi | ||||
|         log "Found wget ... using wget" | ||||
|         [ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--quiet" | ||||
|         if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then | ||||
|             wget "$jarUrl" -O "$wrapperJarPath" | ||||
|             wget $QUIET "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath" | ||||
|         else | ||||
|             wget --http-user=$MVNW_USERNAME --http-password=$MVNW_PASSWORD "$jarUrl" -O "$wrapperJarPath" | ||||
|             wget $QUIET --http-user="$MVNW_USERNAME" --http-password="$MVNW_PASSWORD" "$wrapperUrl" -O "$wrapperJarPath" || rm -f "$wrapperJarPath" | ||||
|         fi | ||||
|     elif command -v curl > /dev/null; then | ||||
|         if [ "$MVNW_VERBOSE" = true ]; then | ||||
|           echo "Found curl ... using curl" | ||||
|         fi | ||||
|         log "Found curl ... using curl" | ||||
|         [ "$MVNW_VERBOSE" = true ] && QUIET="" || QUIET="--silent" | ||||
|         if [ -z "$MVNW_USERNAME" ] || [ -z "$MVNW_PASSWORD" ]; then | ||||
|             curl -o "$wrapperJarPath" "$jarUrl" -f | ||||
|             curl $QUIET -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath" | ||||
|         else | ||||
|             curl --user $MVNW_USERNAME:$MVNW_PASSWORD -o "$wrapperJarPath" "$jarUrl" -f | ||||
|             curl $QUIET --user "$MVNW_USERNAME:$MVNW_PASSWORD" -o "$wrapperJarPath" "$wrapperUrl" -f -L || rm -f "$wrapperJarPath" | ||||
|         fi | ||||
| 
 | ||||
|     else | ||||
|         if [ "$MVNW_VERBOSE" = true ]; then | ||||
|           echo "Falling back to using Java to download" | ||||
|         fi | ||||
|         javaClass="$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.java" | ||||
|         log "Falling back to using Java to download" | ||||
|         javaSource="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.java" | ||||
|         javaClass="$MAVEN_PROJECTBASEDIR/.mvn/wrapper/MavenWrapperDownloader.class" | ||||
|         # For Cygwin, switch paths to Windows format before running javac | ||||
|         if $cygwin; then | ||||
|           javaClass=`cygpath --path --windows "$javaClass"` | ||||
|           javaSource=$(cygpath --path --windows "$javaSource") | ||||
|           javaClass=$(cygpath --path --windows "$javaClass") | ||||
|         fi | ||||
|         if [ -e "$javaSource" ]; then | ||||
|             if [ ! -e "$javaClass" ]; then | ||||
|                 log " - Compiling MavenWrapperDownloader.java ..." | ||||
|                 ("$JAVA_HOME/bin/javac" "$javaSource") | ||||
|             fi | ||||
|             if [ -e "$javaClass" ]; then | ||||
|             if [ ! -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then | ||||
|                 if [ "$MVNW_VERBOSE" = true ]; then | ||||
|                   echo " - Compiling MavenWrapperDownloader.java ..." | ||||
|                 fi | ||||
|                 # Compiling the Java class | ||||
|                 ("$JAVA_HOME/bin/javac" "$javaClass") | ||||
|             fi | ||||
|             if [ -e "$BASE_DIR/.mvn/wrapper/MavenWrapperDownloader.class" ]; then | ||||
|                 # Running the downloader | ||||
|                 if [ "$MVNW_VERBOSE" = true ]; then | ||||
|                   echo " - Running MavenWrapperDownloader.java ..." | ||||
|                 fi | ||||
|                 ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$MAVEN_PROJECTBASEDIR") | ||||
|                 log " - Running MavenWrapperDownloader.java ..." | ||||
|                 ("$JAVA_HOME/bin/java" -cp .mvn/wrapper MavenWrapperDownloader "$wrapperUrl" "$wrapperJarPath") || rm -f "$wrapperJarPath" | ||||
|             fi | ||||
|         fi | ||||
|     fi | ||||
|  | @ -278,33 +251,58 @@ fi | |||
| # End of extension | ||||
| ########################################################################################## | ||||
| 
 | ||||
| export MAVEN_PROJECTBASEDIR=${MAVEN_BASEDIR:-"$BASE_DIR"} | ||||
| if [ "$MVNW_VERBOSE" = true ]; then | ||||
|   echo $MAVEN_PROJECTBASEDIR | ||||
| # If specified, validate the SHA-256 sum of the Maven wrapper jar file | ||||
| wrapperSha256Sum="" | ||||
| while IFS="=" read -r key value; do | ||||
|   case "$key" in (wrapperSha256Sum) wrapperSha256Sum=$value; break ;; | ||||
|   esac | ||||
| done < "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.properties" | ||||
| if [ -n "$wrapperSha256Sum" ]; then | ||||
|   wrapperSha256Result=false | ||||
|   if command -v sha256sum > /dev/null; then | ||||
|     if echo "$wrapperSha256Sum  $wrapperJarPath" | sha256sum -c > /dev/null 2>&1; then | ||||
|       wrapperSha256Result=true | ||||
|     fi | ||||
|   elif command -v shasum > /dev/null; then | ||||
|     if echo "$wrapperSha256Sum  $wrapperJarPath" | shasum -a 256 -c > /dev/null 2>&1; then | ||||
|       wrapperSha256Result=true | ||||
|     fi | ||||
|   else | ||||
|     echo "Checksum validation was requested but neither 'sha256sum' or 'shasum' are available." | ||||
|     echo "Please install either command, or disable validation by removing 'wrapperSha256Sum' from your maven-wrapper.properties." | ||||
|     exit 1 | ||||
|   fi | ||||
|   if [ $wrapperSha256Result = false ]; then | ||||
|     echo "Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised." >&2 | ||||
|     echo "Investigate or delete $wrapperJarPath to attempt a clean download." >&2 | ||||
|     echo "If you updated your Maven version, you need to update the specified wrapperSha256Sum property." >&2 | ||||
|     exit 1 | ||||
|   fi | ||||
| fi | ||||
| 
 | ||||
| MAVEN_OPTS="$(concat_lines "$MAVEN_PROJECTBASEDIR/.mvn/jvm.config") $MAVEN_OPTS" | ||||
| 
 | ||||
| # For Cygwin, switch paths to Windows format before running java | ||||
| if $cygwin; then | ||||
|   [ -n "$M2_HOME" ] && | ||||
|     M2_HOME=`cygpath --path --windows "$M2_HOME"` | ||||
|   [ -n "$JAVA_HOME" ] && | ||||
|     JAVA_HOME=`cygpath --path --windows "$JAVA_HOME"` | ||||
|     JAVA_HOME=$(cygpath --path --windows "$JAVA_HOME") | ||||
|   [ -n "$CLASSPATH" ] && | ||||
|     CLASSPATH=`cygpath --path --windows "$CLASSPATH"` | ||||
|     CLASSPATH=$(cygpath --path --windows "$CLASSPATH") | ||||
|   [ -n "$MAVEN_PROJECTBASEDIR" ] && | ||||
|     MAVEN_PROJECTBASEDIR=`cygpath --path --windows "$MAVEN_PROJECTBASEDIR"` | ||||
|     MAVEN_PROJECTBASEDIR=$(cygpath --path --windows "$MAVEN_PROJECTBASEDIR") | ||||
| fi | ||||
| 
 | ||||
| # Provide a "standardized" way to retrieve the CLI args that will | ||||
| # work with both Windows and non-Windows executions. | ||||
| MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $@" | ||||
| MAVEN_CMD_LINE_ARGS="$MAVEN_CONFIG $*" | ||||
| export MAVEN_CMD_LINE_ARGS | ||||
| 
 | ||||
| WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain | ||||
| 
 | ||||
| # shellcheck disable=SC2086 # safe args | ||||
| exec "$JAVACMD" \ | ||||
|   $MAVEN_OPTS \ | ||||
|   $MAVEN_DEBUG_OPTS \ | ||||
|   -classpath "$MAVEN_PROJECTBASEDIR/.mvn/wrapper/maven-wrapper.jar" \ | ||||
|   "-Dmaven.home=${M2_HOME}" "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ | ||||
|   "-Dmaven.multiModuleProjectDirectory=${MAVEN_PROJECTBASEDIR}" \ | ||||
|   ${WRAPPER_LAUNCHER} $MAVEN_CONFIG "$@" | ||||
|  |  | |||
|  | @ -18,13 +18,12 @@ | |||
| @REM ---------------------------------------------------------------------------- | ||||
| 
 | ||||
| @REM ---------------------------------------------------------------------------- | ||||
| @REM Maven Start Up Batch script | ||||
| @REM Apache Maven Wrapper startup batch script, version 3.2.0 | ||||
| @REM | ||||
| @REM Required ENV vars: | ||||
| @REM JAVA_HOME - location of a JDK home dir | ||||
| @REM | ||||
| @REM Optional ENV vars | ||||
| @REM M2_HOME - location of maven2's installed home dir | ||||
| @REM MAVEN_BATCH_ECHO - set to 'on' to enable the echoing of the batch commands | ||||
| @REM MAVEN_BATCH_PAUSE - set to 'on' to wait for a keystroke before ending | ||||
| @REM MAVEN_OPTS - parameters passed to the Java VM when running Maven | ||||
|  | @ -46,8 +45,8 @@ if "%HOME%" == "" (set "HOME=%HOMEDRIVE%%HOMEPATH%") | |||
| @REM Execute a user defined script before this one | ||||
| if not "%MAVEN_SKIP_RC%" == "" goto skipRcPre | ||||
| @REM check for pre script, once with legacy .bat ending and once with .cmd ending | ||||
| if exist "%HOME%\mavenrc_pre.bat" call "%HOME%\mavenrc_pre.bat" | ||||
| if exist "%HOME%\mavenrc_pre.cmd" call "%HOME%\mavenrc_pre.cmd" | ||||
| if exist "%USERPROFILE%\mavenrc_pre.bat" call "%USERPROFILE%\mavenrc_pre.bat" %* | ||||
| if exist "%USERPROFILE%\mavenrc_pre.cmd" call "%USERPROFILE%\mavenrc_pre.cmd" %* | ||||
| :skipRcPre | ||||
| 
 | ||||
| @setlocal | ||||
|  | @ -120,10 +119,10 @@ SET MAVEN_JAVA_EXE="%JAVA_HOME%\bin\java.exe" | |||
| set WRAPPER_JAR="%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.jar" | ||||
| set WRAPPER_LAUNCHER=org.apache.maven.wrapper.MavenWrapperMain | ||||
| 
 | ||||
| set DOWNLOAD_URL="https://repo.maven.apache.org/maven2/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" | ||||
| set WRAPPER_URL="https://repo.maven.apache.org/maven2/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" | ||||
| 
 | ||||
| FOR /F "tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( | ||||
|     IF "%%A"=="wrapperUrl" SET DOWNLOAD_URL=%%B | ||||
| FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( | ||||
|     IF "%%A"=="wrapperUrl" SET WRAPPER_URL=%%B | ||||
| ) | ||||
| 
 | ||||
| @REM Extension to allow automatically downloading the maven-wrapper.jar from Maven-central | ||||
|  | @ -134,11 +133,11 @@ if exist %WRAPPER_JAR% ( | |||
|     ) | ||||
| ) else ( | ||||
|     if not "%MVNW_REPOURL%" == "" ( | ||||
|         SET DOWNLOAD_URL="%MVNW_REPOURL%/io/takari/maven-wrapper/0.5.6/maven-wrapper-0.5.6.jar" | ||||
|         SET WRAPPER_URL="%MVNW_REPOURL%/org/apache/maven/wrapper/maven-wrapper/3.2.0/maven-wrapper-3.2.0.jar" | ||||
|     ) | ||||
|     if "%MVNW_VERBOSE%" == "true" ( | ||||
|         echo Couldn't find %WRAPPER_JAR%, downloading it ... | ||||
|         echo Downloading from: %DOWNLOAD_URL% | ||||
|         echo Downloading from: %WRAPPER_URL% | ||||
|     ) | ||||
| 
 | ||||
|     powershell -Command "&{"^ | ||||
|  | @ -146,7 +145,7 @@ if exist %WRAPPER_JAR% ( | |||
| 		"if (-not ([string]::IsNullOrEmpty('%MVNW_USERNAME%') -and [string]::IsNullOrEmpty('%MVNW_PASSWORD%'))) {"^ | ||||
| 		"$webclient.Credentials = new-object System.Net.NetworkCredential('%MVNW_USERNAME%', '%MVNW_PASSWORD%');"^ | ||||
| 		"}"^ | ||||
| 		"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%DOWNLOAD_URL%', '%WRAPPER_JAR%')"^ | ||||
| 		"[Net.ServicePointManager]::SecurityProtocol = [Net.SecurityProtocolType]::Tls12; $webclient.DownloadFile('%WRAPPER_URL%', '%WRAPPER_JAR%')"^ | ||||
| 		"}" | ||||
|     if "%MVNW_VERBOSE%" == "true" ( | ||||
|         echo Finished downloading %WRAPPER_JAR% | ||||
|  | @ -154,11 +153,35 @@ if exist %WRAPPER_JAR% ( | |||
| ) | ||||
| @REM End of extension | ||||
| 
 | ||||
| @REM If specified, validate the SHA-256 sum of the Maven wrapper jar file | ||||
| SET WRAPPER_SHA_256_SUM="" | ||||
| FOR /F "usebackq tokens=1,2 delims==" %%A IN ("%MAVEN_PROJECTBASEDIR%\.mvn\wrapper\maven-wrapper.properties") DO ( | ||||
|     IF "%%A"=="wrapperSha256Sum" SET WRAPPER_SHA_256_SUM=%%B | ||||
| ) | ||||
| IF NOT %WRAPPER_SHA_256_SUM%=="" ( | ||||
|     powershell -Command "&{"^ | ||||
|        "$hash = (Get-FileHash \"%WRAPPER_JAR%\" -Algorithm SHA256).Hash.ToLower();"^ | ||||
|        "If('%WRAPPER_SHA_256_SUM%' -ne $hash){"^ | ||||
|        "  Write-Output 'Error: Failed to validate Maven wrapper SHA-256, your Maven wrapper might be compromised.';"^ | ||||
|        "  Write-Output 'Investigate or delete %WRAPPER_JAR% to attempt a clean download.';"^ | ||||
|        "  Write-Output 'If you updated your Maven version, you need to update the specified wrapperSha256Sum property.';"^ | ||||
|        "  exit 1;"^ | ||||
|        "}"^ | ||||
|        "}" | ||||
|     if ERRORLEVEL 1 goto error | ||||
| ) | ||||
| 
 | ||||
| @REM Provide a "standardized" way to retrieve the CLI args that will | ||||
| @REM work with both Windows and non-Windows executions. | ||||
| set MAVEN_CMD_LINE_ARGS=%* | ||||
| 
 | ||||
| %MAVEN_JAVA_EXE% %JVM_CONFIG_MAVEN_PROPS% %MAVEN_OPTS% %MAVEN_DEBUG_OPTS% -classpath %WRAPPER_JAR% "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* | ||||
| %MAVEN_JAVA_EXE% ^ | ||||
|   %JVM_CONFIG_MAVEN_PROPS% ^ | ||||
|   %MAVEN_OPTS% ^ | ||||
|   %MAVEN_DEBUG_OPTS% ^ | ||||
|   -classpath %WRAPPER_JAR% ^ | ||||
|   "-Dmaven.multiModuleProjectDirectory=%MAVEN_PROJECTBASEDIR%" ^ | ||||
|   %WRAPPER_LAUNCHER% %MAVEN_CONFIG% %* | ||||
| if ERRORLEVEL 1 goto error | ||||
| goto end | ||||
| 
 | ||||
|  | @ -170,8 +193,8 @@ set ERROR_CODE=1 | |||
| 
 | ||||
| if not "%MAVEN_SKIP_RC%"=="" goto skipRcPost | ||||
| @REM check for post script, once with legacy .bat ending and once with .cmd ending | ||||
| if exist "%HOME%\mavenrc_post.bat" call "%HOME%\mavenrc_post.bat" | ||||
| if exist "%HOME%\mavenrc_post.cmd" call "%HOME%\mavenrc_post.cmd" | ||||
| if exist "%USERPROFILE%\mavenrc_post.bat" call "%USERPROFILE%\mavenrc_post.bat" | ||||
| if exist "%USERPROFILE%\mavenrc_post.cmd" call "%USERPROFILE%\mavenrc_post.cmd" | ||||
| :skipRcPost | ||||
| 
 | ||||
| @REM pause the script if MAVEN_BATCH_PAUSE is set to 'on' | ||||
|  | @ -179,4 +202,4 @@ if "%MAVEN_BATCH_PAUSE%" == "on" pause | |||
| 
 | ||||
| if "%MAVEN_TERMINATE_CMD%"=="on" exit %ERROR_CODE% | ||||
| 
 | ||||
| exit /B %ERROR_CODE% | ||||
| cmd /C exit /B %ERROR_CODE% | ||||
|  |  | |||
|  | @ -32,8 +32,8 @@ | |||
|         <logback.version>1.2.12</logback.version> | ||||
|         <maven.compiler.plugin.version>3.11.0</maven.compiler.plugin.version> | ||||
|         <maven-surefire-plugin.version>3.1.2</maven-surefire-plugin.version> | ||||
|         <spotless.version>2.37.0</spotless.version> | ||||
|         <google-java-format.version>1.17.0</google-java-format.version> | ||||
|         <spotless.version>2.40.0</spotless.version> | ||||
|         <google-java-format.version>1.18.1</google-java-format.version> | ||||
|         <okhttp.version>4.11.0</okhttp.version> | ||||
|         <gson.version>2.10.1</gson.version> | ||||
|         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> | ||||
|  |  | |||
|  | @ -784,7 +784,9 @@ public class HttpTest { | |||
|         "/stream/consumers/foo-virtual-host", | ||||
|         "/stream/publishers/foo-virtual-host", | ||||
|         "/stream/publishers/foo-virtual-host", | ||||
|         "/stream/publishers/%2F/foo-stream" | ||||
|         "/stream/publishers/%2F/foo-stream", | ||||
|         "/stream/%2F/foo-stream/tracking", | ||||
|         "/stream/foo-virtual-host/foo-stream/tracking", | ||||
|       }) | ||||
|   void shouldReturnNotFound(String endpoint) { | ||||
|     assertThatThrownBy(() -> get(endpoint)).hasMessageContaining("404"); | ||||
|  | @ -867,6 +869,112 @@ public class HttpTest { | |||
|     } | ||||
|   } | ||||
| 
 | ||||
|   @Test | ||||
|   void trackingInfo() throws Exception { | ||||
|     String endpoint = "/stream/%2F/" + stream + "/tracking"; | ||||
|     Callable<Map<String, Object>> getTracking = () -> toMap(get(endpoint)); | ||||
|     Map<String, Object> tracking = getTracking.call(); | ||||
|     assertThat(tracking).hasSize(2).containsKeys("offsets", "writers"); | ||||
|     assertThat(trackingOffsets(tracking)).isEmpty(); | ||||
|     assertThat(trackingWriters(tracking)).isEmpty(); | ||||
| 
 | ||||
|     String consumerReference1 = "foo"; | ||||
|     String consumerReference2 = "bar"; | ||||
| 
 | ||||
|     Client client = cf.get(); | ||||
|     client.storeOffset(consumerReference1, stream, 42); | ||||
|     waitUntil(() -> client.queryOffset(consumerReference1, stream).getOffset() == 42); | ||||
|     tracking = getTracking.call(); | ||||
|     assertThat(trackingOffsets(tracking)).hasSize(1).containsEntry(consumerReference1, d(42)); | ||||
|     assertThat(trackingWriters(tracking)).isEmpty(); | ||||
| 
 | ||||
|     client.storeOffset(consumerReference1, stream, 55); | ||||
|     waitUntil(() -> client.queryOffset(consumerReference1, stream).getOffset() == 55); | ||||
|     tracking = getTracking.call(); | ||||
|     assertThat(trackingOffsets(tracking)).hasSize(1).containsEntry(consumerReference1, d(55)); | ||||
|     assertThat(trackingWriters(tracking)).isEmpty(); | ||||
| 
 | ||||
|     client.storeOffset(consumerReference2, stream, 12); | ||||
|     waitUntil(() -> client.queryOffset(consumerReference2, stream).getOffset() == 12); | ||||
|     tracking = getTracking.call(); | ||||
|     assertThat(trackingOffsets(tracking)) | ||||
|         .hasSize(2) | ||||
|         .containsEntry(consumerReference1, d(55)) | ||||
|         .containsEntry(consumerReference2, d(12)); | ||||
|     assertThat(trackingWriters(tracking)).isEmpty(); | ||||
| 
 | ||||
|     tracking = toMap(get(endpoint + "?type=offset")); | ||||
|     assertThat(tracking).hasSize(1).containsKey("offsets"); | ||||
| 
 | ||||
|     String publisherReference1 = "foobar1"; | ||||
|     String publisherReference2 = "foobar2"; | ||||
|     byte pub1 = 0; | ||||
|     byte pub2 = 1; | ||||
|     assertThat(client.declarePublisher(pub1, publisherReference1, stream).isOk()).isTrue(); | ||||
|     assertThat(client.declarePublisher(pub2, publisherReference2, stream).isOk()).isTrue(); | ||||
| 
 | ||||
|     client.publish(pub1, message(client), o -> 25); | ||||
|     waitUntil(() -> client.queryPublisherSequence(publisherReference1, stream) == 25); | ||||
| 
 | ||||
|     tracking = getTracking.call(); | ||||
|     assertThat(trackingOffsets(tracking)) | ||||
|         .hasSize(2) | ||||
|         .containsEntry(consumerReference1, d(55)) | ||||
|         .containsEntry(consumerReference2, d(12)); | ||||
|     assertThat(trackingWriters(tracking)).hasSize(1).containsEntry(publisherReference1, d(25)); | ||||
| 
 | ||||
|     client.publish(pub1, message(client), o -> 36); | ||||
|     waitUntil(() -> client.queryPublisherSequence(publisherReference1, stream) == 36); | ||||
| 
 | ||||
|     tracking = getTracking.call(); | ||||
|     assertThat(trackingOffsets(tracking)) | ||||
|         .hasSize(2) | ||||
|         .containsEntry(consumerReference1, d(55)) | ||||
|         .containsEntry(consumerReference2, d(12)); | ||||
|     assertThat(trackingWriters(tracking)).hasSize(1).containsEntry(publisherReference1, d(36)); | ||||
| 
 | ||||
|     client.publish(pub2, message(client), o -> 45); | ||||
|     waitUntil(() -> client.queryPublisherSequence(publisherReference2, stream) == 45); | ||||
| 
 | ||||
|     tracking = getTracking.call(); | ||||
|     assertThat(trackingOffsets(tracking)) | ||||
|         .hasSize(2) | ||||
|         .containsEntry(consumerReference1, d(55)) | ||||
|         .containsEntry(consumerReference2, d(12)); | ||||
|     assertThat(trackingWriters(tracking)) | ||||
|         .hasSize(2) | ||||
|         .containsEntry(publisherReference1, d(36)) | ||||
|         .containsEntry(publisherReference2, d(45)); | ||||
| 
 | ||||
|     tracking = toMap(get(endpoint + "?type=writer")); | ||||
|     assertThat(tracking).hasSize(1).containsKey("writers"); | ||||
| 
 | ||||
|     tracking = toMap(get(endpoint + "?type=all")); | ||||
|     assertThat(tracking).hasSize(2).containsKeys("offsets", "writers"); | ||||
| 
 | ||||
|     tracking = toMap(get(endpoint + "?type=unknown-means-all")); | ||||
|     assertThat(tracking).hasSize(2).containsKeys("offsets", "writers"); | ||||
|   } | ||||
| 
 | ||||
|   @SuppressWarnings("unchecked") | ||||
|   private static Map<String, Number> trackingOffsets(Map<String, Object> tracking) { | ||||
|     return (Map<String, Number>) tracking.get("offsets"); | ||||
|   } | ||||
| 
 | ||||
|   @SuppressWarnings("unchecked") | ||||
|   private static Map<String, Number> trackingWriters(Map<String, Object> tracking) { | ||||
|     return (Map<String, Number>) tracking.get("writers"); | ||||
|   } | ||||
| 
 | ||||
|   private static Double d(int value) { | ||||
|     return (double) value; | ||||
|   } | ||||
| 
 | ||||
|   private static List<Message> message(Client client) { | ||||
|     return Collections.singletonList( | ||||
|         client.messageBuilder().addData("hello".getBytes(StandardCharsets.UTF_8)).build()); | ||||
|   } | ||||
| 
 | ||||
|   static class PermissionsTestConfiguration { | ||||
|     final String user; | ||||
|     final String endpoint; | ||||
|  |  | |||
|  | @ -75,12 +75,12 @@ init(Req, Opts) -> | |||
|         undefined -> | ||||
|             no_supported_sub_protocol(undefined, Req); | ||||
|         Protocol -> | ||||
|             WsOpts0 = proplists:get_value(ws_opts, Opts, #{}), | ||||
|             WsOpts  = maps:merge(#{compress => true}, WsOpts0), | ||||
|             case lists:member(<<"mqtt">>, Protocol) of | ||||
|                 false -> | ||||
|                     no_supported_sub_protocol(Protocol, Req); | ||||
|                 true -> | ||||
|                     WsOpts0 = proplists:get_value(ws_opts, Opts, #{}), | ||||
|                     WsOpts  = maps:merge(#{compress => true}, WsOpts0), | ||||
|                     ShouldUseFHC = application:get_env(?APP, use_file_handle_cache, true), | ||||
|                     case ShouldUseFHC of | ||||
|                       true  -> ?LOG_INFO("Web MQTT: file handle cache use is enabled"); | ||||
|  | @ -278,7 +278,12 @@ terminate(_Reason, _Request, | |||
| no_supported_sub_protocol(Protocol, Req) -> | ||||
|     %% The client MUST include “mqtt” in the list of WebSocket Sub Protocols it offers [MQTT-6.0.0-3]. | ||||
|     ?LOG_ERROR("Web MQTT: 'mqtt' not included in client offered subprotocols: ~tp", [Protocol]), | ||||
|     {ok, cowboy_req:reply(400, #{<<"connection">> => <<"close">>}, Req), #state{}}. | ||||
|     %% Set should_use_fhc to false, because at this early stage of init no fhc | ||||
|     %% obtain was called, so terminate/3 should not call fhc release | ||||
|     %% (even if use_file_handle_cache is true) | ||||
|     {ok, | ||||
|      cowboy_req:reply(400, #{<<"connection">> => <<"close">>}, Req), | ||||
|      #state{should_use_fhc = false}}. | ||||
| 
 | ||||
| handle_data(Data, State0 = #state{}) -> | ||||
|     case handle_data1(Data, State0) of | ||||
|  |  | |||
|  | @ -95,7 +95,7 @@ Release notes are kept under [rabbitmq-server/release-notes](https://github.com/ | |||
|    and [streams](https://rabbitmq.com/streams.html#leader-election) already do. At the same time, this means | ||||
|    that RabbitMQ clusters now **must have a majority of nodes online at all times**, or all client operations will be refused. | ||||
| 
 | ||||
|    In RabbitMQ 4.0 Khepri will be the only option, and **Mnesia will be completely removed**. | ||||
|    The target is to make Khepri the default database engine in RabbitMQ 4.0. | ||||
| 
 | ||||
|  * Messages are now internally stored using a new common heavily AMQP 1.0-influenced container format. This is a major step towards a protocol-agnostic core: | ||||
|    a common format that encapsulates a sum of data types used by the protocols RabbitMQ supports, plus annotations for routng, dead-lettering state, | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue