This commit increases consumption throughput from a stream via AMQP 0.9.1
for 1 consumer by 83k msg/s or 55%,
for 4 consumers by 140k msg/s or 44%.
This commit tries to follow https://www.erlang.org/doc/efficiency_guide/binaryhandling.html
by reusing match contexts instead of creating new sub-binaries.
The CPU and mmap() memory flame graphs show that
when producing and consuming from a stream via AMQP 0.9.1
module amqp10_binary_parser requires
before this commit: 10.1% CPU time and 8.0% of mmap system calls
after this commit: 2.6% CPU time 2.5% of mmap system calls
Performance tests
Start rabbitmq-server without any plugins enabled and with 4 schedulers:
```
make run-broker PLUGINS="" RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS="+JPperf true +S 4"
```
Test 1
Perf test client:
```
-x 1 -y 2 -qa x-queue-type=stream -ad false -f persistent -u s1 --qos 10000 --multi-ack-every 1000 -z 30
```
master branch:
sending rate avg msg/s 143k - 146k
receiving rate avg msg/s 188k - 194k
PR:
sending rate avg 133k - 138k
receiving rate avg 266k - 276k
This shows that with AMQP 0.9.1 and a stream, prior to this commit the broker could not
deliver messages to consumers as fast as they were published.
After this commit, it can.
Test 2
First, produce a few millions messages:
```
-x 1 -y 0 -qa x-queue-type=stream -ad false -f persistent -u s2
```
Then, consume them:
```
-x 0 -y 1 -qa x-queue-type=stream -ad false -f persistent -u s2 --qos 10000 --multi-ack-every 1000 -ca x-stream-offset=first -z 30
```
receving rate avg msg/s
master branch:
147k - 156k
PR:
230k - 237k
Improvement: 83k / 55%
Test 3
-x 0 -y 4 -qa x-queue-type=stream -ad false -f persistent -u s2 --qos 10000 --multi-ack-every 1000 -ca x-stream-offset=first -z 30
receving rate avg msg/s
master branch:
313k - 319k
PR:
450k - 461k
Improvement: 140k / 44%
List `MsgIds` has fewer elements than list `Settles`.
Therefore, put it on the left side of the `++` operator.
The memory flame graph revealed that before this commit
5%-8% of all mmap() system calls happened in function
rabbit_fifo_client:settle/3.
After this commit only 1.6% - 1.8% of all mmap() system calls happen in
this function.
Note that we cannot do the same for discarded messages (`Discards`)
because the order in which messages will be dead lettered need to be
preserved.
The default format of how the log level gets printed should be the full
name. For example, we want "debug" instead of "dbug".
This was also the default behaviour before commit aca638abbb
Before this commit, when importing definitions with many bindings
and then enabling the feature flag direct_exchange_routing_v2 in
parallel a dead lock occurred.
The process that did the migration was hanging in:
```
(rabbit@r1-server-2.r1-nodes.default)42> erlang:process_info(<0.1447.0>, [current_stacktrace]).
[{current_stacktrace,[{gen,do_call,4,
[{file,"gen.erl"},{line,214}]},
{gen_server2,call,3,[{file,"gen_server2.erl"},{line,342}]},
{rabbit_misc,execute_mnesia_transaction,1,
[{file,"rabbit_misc.erl"},{line,561}]},
{rabbit_core_ff,direct_exchange_routing_v2_migration,3,
[{file,"rabbit_core_ff.erl"},{line,213}]},
{rabbit_feature_flags,run_migration_fun,3,
[{file,"rabbit_feature_flags.erl"},{line,1621}]},
{rabbit_feature_flags,do_enable,1,
[{file,"rabbit_feature_flags.erl"},{line,1523}]},
{erpc,execute_call,4,[{file,"erpc.erl"},{line,392}]}]}]
```
while all 8 worker processes under process `definition_import_pool_sup`
were hanging in:
```
(rabbit@r1-server-2.r1-nodes.default)38> erlang:process_info(<0.592.0>, [current_stacktrace]).
[{current_stacktrace,[{global,random_sleep,1,
[{file,"global.erl"},{line,2505}]},
{global,set_lock,4,[{file,"global.erl"},{line,421}]},
{rabbit_feature_flags,is_enabled,2,
[{file,"rabbit_feature_flags.erl"},{line,581}]},
{rabbit_binding,sync_index_route,3,
[{file,"rabbit_binding.erl"},{line,469}]},
{rabbit_binding,add,4,
[{file,"rabbit_binding.erl"},{line,182}]},
{mnesia_tm,apply_fun,3,[{file,"mnesia_tm.erl"},{line,842}]},
{mnesia_tm,execute_transaction,5,
[{file,"mnesia_tm.erl"},{line,818}]},
{rabbit_misc,'-execute_mnesia_transaction/1-fun-0-',1,
[{file,"rabbit_misc.erl"},{line,565}]}]}]
```
So, all worker processes which imported the bindings were blocked
because rabbit_feature_flags:is_enabled/1 blocks because the feature
flag is in state_changing.
At the same time, the process which enables the feature flag could not
proceed because it was waiting for a lock held by the worker processes.
In this commit, we resolve the dead lock by not blocking in
rabbit_feature_flags:is_enabled/2.
This should be safe because we still rely on Mnesia locking to get a
consistent migration:
1. The migration process sets both a read lock on the rabbit_route table and a
write lock on the rabbit_index_route table. Therefore, once it acquired
both locks, it can populate the new rabbit_index_route table without any
import binding processes interfering.
2. Mnesia does a good job in avoiding dead locks by restarting
transactions (as long as we don't set out own locks outside of Mnesia,
as done prior to this commit).
A queue (Q1) can have an extra_bcc queue (Q2).
Whenever a message is routed to Q1, it must also be routed to Q2.
Commit fc2d37ed1c
puts the logic to determine extra_bcc queues into
rabbit_exchange:route/2.
That is functionally correct because it ensures that messages being dead
lettered to target queues will also route to the target queues'
extra_bcc queues.
For every message being routed, that commit uses ets:lookup/2
just to check for an extra_bcc queue.
(Technically, that commit is not a regression because it does not slow
down the common case where a message is routed to a single target queue
because before that commit rabbit_channel:deliver_to_queues/3
used two ets:lookup/2 calls.)
However we can do better by avoiding the ets:lookup/2 for the common
case where there is no extra_bcc queue set.
One option is to use ets:lookup_element/3 to only fetch the queue
'options' field.
A better option (implemented in this commit) is determining whether to
send to an extra_bcc queue in the rabbit_channel and in the at-most
and at-least once dead lettering modules where the queue records
are already looked up.
This commit speeds up sending throughput by a few thousand messages per
second.
Single active consumer must have a name, which is used as the reference
for storing offsets and as the name of the group the consumer belongs
to in case the stream is a partition of a super stream.
References #3753
A formerly active consumer can have in-flight credit
requests when it becomes inactive. This commit checks
the state of consumer on credit requests and make sure
not to dispatch messages if it's inactive.
The UI handles the case where the 2 fields are not present.
This can happen in a mixed-version cluster, where a node
of a previous version returns records without the fields.
The UI uses default values (active = true, activity status = up),
which is valid as the consumers of the node are "standalone"
consumers (not part of a group).
References #3753
For stream consumer groups CLI commands. This is useful in case
new fields are needed in further versions. A new version
node can ask for new fields to an old version node. If
the old version node returns a known value for unknown fields
instead of failing, the new node can set up appropriate
default value for these fields in the result of the CLI commands.
References #3753
Stream consumers can be active or not with SAC, so these 2 fields
are added to the stream metrics. This is the same as with
regular consumers.
References #3753