`sets` v2 were not yet available when this module was written. Compared
to `gb_sets`, v2 `sets` are faster and more memory efficient:
> List = lists:seq(1, 50_000).
> tprof:profile(sets, from_list, [List, [{version, 2}]], #{type => call_memory}).
****** Process <0.94.0> -- 100.00% of total ***
FUNCTION CALLS WORDS PER CALL [ %]
maps:from_keys/2 1 184335 184335.00 [100.00]
184335 [ 100.0]
ok
> tprof:profile(gb_sets, from_list, [List], #{type => call_memory}).
****** Process <0.97.0> -- 100.00% of total ***
FUNCTION CALLS WORDS PER CALL [ %]
lists:rumergel/3 1 2 2.00 [ 0.00]
gb_sets:from_ordset/1 1 3 3.00 [ 0.00]
lists:reverse/2 1 100000 100000.00 [16.76]
lists:usplit_1/5 49999 100002 2.00 [16.76]
gb_sets:balance_list_1/2 65535 396605 6.05 [66.48]
596612 [100.0]
The `rabbit_mgmt_gc` gen_server performs garbage collections
periodically. When doing so it can create potentially fairly large
terms, for example by creating a set out of
`rabbit_exchange:list_names/0`. With many exchanges, for example, the
process memory usage can climb steadily especially when the management
agent is mostly idle since `rabbit_mgmt_gc` won't hit enough reductions
to cause a full-sweep GC on itself. Since the process is only active
periodically (once every 2min by default) we can hibernate it to GC the
terms it created.
This can save a medium amount of memory in situations where there are
very many pieces of metadata (exchanges, vhosts, queues, etc.). For
example on an idle single-node broker with 50k exchanges,
`rabbit_mgmt_gc` can hover around 50MB before being naturally GC'd. With
this patch the process memory usage stays consistent between `start_gc`
timer messages at around 1KB.
The `below_node_connection_limit_test` and `ready_to_serve_clients_test`
cases could possibly flake because `is_quorum_critical_single_node_test`
uses the channel manager in `rabbit_ct_client_helpers` to open a
connection. This can cause the line
true = lists:all(fun(E) -> is_pid(E) end, Connections),
to fail to match. The last connection could have been rejected if the
channel manager kept its connection open, so instead of being a pid the
element would have been `{error, not_allowed}`.
With `rabbit_ct_client_helpers:close_channels_and_connection/2` we can
reset the connection manager and force it to close its connection.
Returning the connection limit and active count are not really necessary
for these checks. Instead of returning them in the response to the
health check we log a warning when the connection limit is exceeded.
This is useful for a load balancer, for example, to be able to avoid
sending new connections to a node which is running and has listeners
bound to TCP ports but is being drained for maintenance.
This updates the health check for protocol listeners to accept a set of
protocols, comma-separated. The check only returns 200 OK when all
requested protocols have active listeners.
This is a minor change that avoids a cluster-wide query for active
listeners. The old code called `rabbit_networking:active_listeners/0`
and then filtered the results by ones available on the local node. This
caused an RPC and concatenation of all other cluster members' listeners
and then in the next line filtered down to local nodes. Equivalently we
can use `rabbit_networking:node_listeners(node())` which dumps a local
ETS table.
This is not a very impactful change but it's nice to keep the latency of
the health-check handlers low and reduce some unnecessary cluster noise.
At CQ startup variable_queue went through each seqid from 0 to
next_seq_id looking for the first message even if there were no
messages in the queue (no segment files).
In case of a clean shutdown the value next_seq_id is stored in
recovery terms. This value can be utilized by the queue index to
provide better seqid bounds in absence of segment files.
Before this patch starting an empty classic queue with next_seq_id =
100_000_000 used to take about 26 seconds. With this patch it takes
less than 1ms.
[Why]
They were moved from `rabbit` to `rabbit_common` several years ago to
solve an dependency issue because `amqp_client` depended on the file
handle cache. This is not the case anymore.
[How]
The modules are moved back to `rabbit`.
`rabbit_common` doesn't need to depend on `os_mon` anymore. `rabbit`
already depends on it, so no changes needed here.
`include/rabbit_memory.hrl` and some test cases are moved as well to
follow the `vm_memory_monitor` module.
Consumers with a same name, consuming from the same stream should have
the same partition index. This commit adds a check to enforce this rule
and make the subscription fail if it does not comply.
Fixes#13835
Vhosts that currently don't have their own default queue type, now
inherit it from the node configuration and store it in their metadata
going forward.
The correct place for the `default_queue_type` property
is inside the `metadata` block. However, right now we'd
always export the value outside of `metadata` AND only
export it inside `metadata`, if it was not `undefined`.
This value outside of `metadata` was just misleading:
if a user exported the definitins from a fresh node,
changed `classic` to `quorum` and imported such modified
values, the DQT would still be `classic`, because RMQ looks
for the value inside `metadata`. Just to make it more confusing,
if the DQT was changed successfully one way or another, the
value outside of `metadata` would reflect that
(it always shows the correct value, but is ignored on import).
rabbit_channel may use amqp_channel as the writer.
When terminating, rabbit_channel sends a `flush` message
to its writer. If amqp_channel is in use, that led to
a `function_clause` crash.
[Why]
The `rabbit_khepri` module grew during the work to add Khepri support to
RabbitMQ and while Khepri was itself written. The current code is
therefore unorganized.
[How]
This commit tries to change proxy functions to be close to their Khepri
equivalent.
The module continues to set non-default options for write functions. We
also add the variants that take an option map to be consistent and not
have to deal with that in the future.
Several legacy functions were removed, either because they were no
longer called or because they were replace by a regular Khepri call.
[Why]
The `rabbit_khepri` module grew during the work to add Khepri support to
RabbitMQ and while Khepri was itself written. The current code is
therefore unorganized.
[How]
This commit tries to sort the code that manages the setup of Khepri and
the functions tha deal with the Khepri cluster. It also groups functions
which provide support for CLI commands.
It also adds documentation to several functions.
Finally, when a node joins a cluster, we stop displaying the content of
the Khepri tree.
It's a tradeoff between building the map for each incoming and outgoing
message (now that there are also outgoing interceptors) vs increased
memory usage for the MQTT proc state.
Connecting with MQTT 5.0 and client ID "xxxxxxxx", the number of words
are 201 before this commit vs 235 after this commit as determined by:
```
S = sys:get_state(MQTTConnectionPid),
erts_debug:size(S).
```
Therefore, this commit requires 34 word * 8 bytes = 272 bytes more per MQTT
connection, that is 272 MB more for 1,000,000 MQTT connections.
1. Force the config for timestamp and routing node message interceptors
to be configured with the overwrite boolean() to avoid defining
multiple default values throughout the code.
2. Add type specs
3. Extend existing test case for new MQTT client ID interceptor
4. routing node and timestamp should only set the annotation for
incoming_message_interceptors group
5. Fix `rabbitmq.conf`.
Prior to this commit there were several issue:
a.) Setting the right configuration was too user unfriendly, e.g. the user has to set
```
message_interceptor.incoming.rabbit_mqtt_message_interceptor_client_id.annotation_key = x-opt-mqtt-client-id
```
just to enable the MQTT message interceptor.
b.) The code that parses was too difficult to understand
c.) MQTT plugin was setting the env for app rabbit, which is an anti-pattern
d.) disabling a plugin (e.g. MQTT), left its message interceptors still in place
This is now all fixed, the user sets the rabbitmq.conf as follows:
```
message_interceptors.incoming.set_header_timestamp.overwrite = true
message_interceptors.incoming.set_header_routing_node.overwrite = false
mqtt.message_interceptors.incoming.set_client_id_annotation.enabled = true
```
Note that the first two lines use the same format as for RabbitMQ 4.0
for backwards compatiblity. The last line (MQTT) follows a similar
pattern.
This commit enables users to provide custom message interceptor modules,
i.e. modules to process incoming and outgoing messages. The
`rabbit_message_interceptor` behaviour defines a `intercept/4` callback,
for those modules to implement.
Co-authored-by: Péter Gömöri <gomoripeti@users.noreply.github.com>
[Why]
The intent is to have it stable and enabled by default for new
deployment in RabbitMQ 4.1.x.
To prepare for this goal, it is time to mark the feature flag as stable
to let us iron out the library and its integration into RabbitMQ.
This is not a commitment at this stage: we will revisit this near the
beginning of the release cycle and commit to it or revert to
experimental.
Key changes:
- endpoint variable to handle scraping multiple endpoints
- message size panels (new metric in 4.1)
- panels at the top of the Overview dashboard should be more up to date
(they show the latest value)
- values should be accurate if multiple endpoints are scraped
(previously, many would be doubled)
- Nodes table shows fewer volumns and shows node uptime
When debug logging is enabled, we log something at each log level
to test if logs are emitted. I don't think this is particularly useful,
but it's certainly annoying, because I constatnly need to filter
out these logs when searching if any errors happened during tests.
This auth backend behaves the same as the internal backend provided in
the core broker, but it only accepts loopback connections. External
connection attempts will receive an error.
This ensures that quorum_queues shuts down _before_
coordination where khepri run inside.
Quorum queues depend on khepri so need to be shut down first.
[Why]
The feature flag enable function is called during the initial migration
or when a node is later added to a cluster.
In this latter situation, the cluster is already formed and the Mnesia
tables were already migrated. Syncing the cluster in this specific
situation might kick another node that is currently unreachable.
[How]
If the node running the enable function is already clustered, we skip
the cluster sync.
[Why]
All callers of `khepri_adv` and `khepri_tx_adv` need updates to handle
the now uniform return type of `khepri:node_props_map()` in Khepri
0.17.0.
[How]
We don't need any compatibility code to handle "either the old return
type or the new return type" from the khepri_adv API because the
translation is done entirely in the "client side" code in Khepri -
meaning that the return value from the Ra server is the same but it is
translated differently by the functions in `khepri_adv`.
However, we need to adapt transaction functions because they may be
executed on different versions of Khepri and the behaviour of
`khepri_tx_adv` can be different. To take the possible change of return
value format, we use the new `khepri_tx:does_api_comply_with/1` to know
what to expect.
[Why]
In Khepri 0.17.0, `khepri_cluster:locally_known_members/1` and
`khepri_cluster:locally_known_node/1` were replaced with
`khepri_cluster:members/2` and `khepri_cluster:nodes/2` with `favor` set
to `low_latency` - this matches the interface for queries in Khepri.
Move leader repair earlier in tick function to ensure more
timely update of meta data store record after leader change.
Also use RPC_TIMEOUT macro for metric/stats multicalls to improve
liveness when a node is connected but partitioned / frozen.
This should address crashes like this in (found in user's logs):
```
exception error: no case clause matching
[[{connection_details,[]},
{name,<<"10.0.13.41:50497 -> 10.2.230.128:5671 (1)">>},
{node,rabbit@foobar},
{number,1},
{user,<<"...">>},
{user_who_performed_action,<<"...">>},
{vhost,<<"/">>}],
[{connection_details,[]},
{name,<<"10.0.13.41:50142 -> 10.2.230.128:5671 (1)">>},
{node,rabbit@foobar},
{number,1},
{user,<<"...">>},
{user_who_performed_action,<<"...">>},
{vhost,<<"/">>}]]
in function rabbit_federation_mgmt:format/3 (rabbit_federation_mgmt.erl, line 100)
in call from rabbit_federation_mgmt:'-status/3-lc$^0/1-0-'/4 (rabbit_federation_mgmt.erl, line 89)
in call from rabbit_federation_mgmt:'-status/4-lc$^0/1-0-'/3 (rabbit_federation_mgmt.erl, line 82)
in call from rabbit_federation_mgmt:'-status/4-lc$^0/1-0-'/3 (rabbit_federation_mgmt.erl, line 82)
in call from rabbit_federation_mgmt:status/4 (rabbit_federation_mgmt.erl, line 82)
in call from rabbit_federation_mgmt:to_json/2 (rabbit_federation_mgmt.erl, line 57)
in call from cowboy_rest:call/3 (src/cowboy_rest.erl, line 1590)
in call from cowboy_rest:set_resp_body/2 (src/cowboy_rest.erl, line 1473)
```
## What?
This commit determines the queue topology without checking the queue type.
## Why?
This way, checking leader and replicas works the same across all queue
types without the need to introduce other rabbit_queue_type behaviour as
suggested in other PRs.
## How?
pid is the leader, nodes in queue_type_states are the members/replicas.
This commit results in an unknown stream leader during queue
declaration. However the correct leader will be returned eventually when
calling GET on the stream.
This allows restricting access to the /api/index.html and
the /cli/index.html page to authenticated users should the
user really want to. This can be enabled via advanced.config.
A connection which terminated before it was fully established
would lead to a function_clause, since metadata is not available
to really call notify_connection_closed. We can just ignore such
connections and not notify about them.
Resolves https://github.com/rabbitmq/rabbitmq-server/discussions/13670
The same connection can contain several consumers belonging to a SAC
group (group key = vhost + stream + consumer name). The whole new group
must be re-evaluated to select a new active consumer after the consumers
of the down connection are removed from it.
The previous behavior would not re-evaluate the new group and could
select a consumer from the down connection, letting the group with only
inactive consumers, as the selected active consumer would never receive
the activation message from the stream SAC coordinator.
This commit fixes this problem by removing the consumers of the down
down connection from the affected groups and then performing the
appropriate operations for the groups to keep on consuming (e.g.
notifying an active consumer that it needs to step down).
References #13372
Ra improvements:
* Don't allow a non-voter to start elections
* Register with ra directory before initialising ra server.
* Trigger tick_timeout immediately after entering leader state.
* Set a configurable segment max size
This commit also includes a change to turn the quorum queue
become leader callback to become a noop and instead rely on
the more promptly tick_handler to handle the meta data store
update after a leader election.
This more prompt tick update means there should be a much shorter
gap between the queue metrics being deleted from the old leader
node to them being available again on the new node resulting
in smoother message count metrics.
Fix test that relied on waiting on too simplistic a property
before asserting.
Prior to this commit, when a client consumed from an unavailable quorum
queue, the following crash occurred:
```
{badmatch,{error,noproc}}
[{rabbit_quorum_queue,consume,3,[{file,\"rabbit_quorum_queue.erl\"},{line,993}]}
```
This commit fixes this bug by returning any error when registering a
quorum queue consumer to rabbit_queue_type.
This commit also refactors errors returned by
rabbit_queue_type:consume/3 to simplify and ensure seperation of
concerns.
For example prior to this commit, the channel did error
formatting specifically for consuming from streams. It's better if
the channel is unaware of what queue type it consumes from and have each
queue type implementation format their own errors.
Bump the timeout for management operations and link attachments from 20s
to 30s. We've seen timeouts in CI.
We bump the poll interval of the `?awaitMatch` macro because CI
sometimes flaked by crashing in
0e803de6dd/deps/rabbitmq_amqp_client/src/rabbitmq_amqp_client.erl (L411)
which indicates that the client lib received a response from a previous
request.
To take more frequent checkpoints for large message workload
Lower the min_checkpoint_interval substantially to allow quorum queues
better control over when checkpoints are taken.
Track bytes enqueued in the aux state and suggest a checkpoint after
every 64MB enqueued (this value is scaled according to backlog just
like the indexes condition).
This should help with more timely checkpointing when very large
messages is used.
Try evaluating byte size independently of time window
also increase max size
Delayed queuese can automatically create associated Shovels to transfer Ready messages
to the desired destination. This adds forwarded messages counter which will be used
in Management UI for better Shovel internals visibility.
(cherry picked from commit a8800b6cd75d8dc42a91f88655058f2ffa3b6ea6)
for the check introduced in #13487.
Note that encoding a regular expression pattern
with percent encoding is a pain (e.g. '.*' = '.%2a'),
so these endpoints fall back to a default pattern
value that matches all queues.
This commit fixes a bug in the Erlang AMQP 1.0 client.
Prior to this commit, to repro this bug:
1. Send more than 2^16 messages to a queue.
2. Grant more than a total of 2^16 link credit initially (on a single link
or across multiple links) on a single session without any
auto or manual link credit renewal.
The expectation is that thanks to sufficiently granted initial link-credit,
the client will receive all messages.
However, consumption stops after exactly 2^16-1 messages.
That's because the client lib was never sending a flow frame to the server.
So, after the client received all 2^16-1 messages (the initial
incoming-window set by the client), the server's remote-incoming-window
reached 0 causing the server to stop delivering messages.
The expectation is that the client lib automatically handles session
flow control without any manual involvement of the client app.
This commit implements this fix:
* We keep the server's remote-incoming window always large by default as
explained in https://www.rabbitmq.com/blog/2024/09/02/amqp-flow-control#incoming-window
* Hence, the client lib sets its incoming-window to 100,000 initially.
* The client lib tracks its incoming-window decrementing it by 1 for
every transfer it received. (This wasn't done prior to this commit.)
* Whenever this window shrinks below 50,000, the client sends a flow
frame without any link information widening its incoming-window back to 100,000.
* For test cases (maybe later for apps as well), there is a new function
`amqp10_client_session:flow/3`, which allows for a test case to do manual
session flow control. Its API is designed very similar to
`amqp10_client_session:flow_link/4` in that the test can optionally request
the lib to auto widen the session window whenever it falls below a certain threshold.
This is a squashed commit that includes the following changes by @efimov90:
* Initial-theme-fix
Added light.css
Added dark.css
Added link for light.css and dark.css with media attribute
Added switcher
* Rework-light-style
* dark theme
* Removed not needed div
* Fix folder name
* Color scheme fix
Removes color-scheme from main.css
Added color-scheme: dark to dark.css
Added color-scheme: light to light.css
* Fixed theme switch bug with sammy.js
Adapts code to works with sammy.js
* Icons update
* Reworked theme switcher
* Fix updating attributes
---------
Authored-by: Sergey Efimov <efimov90@gmail.com>