Commit Graph

54 Commits

Author SHA1 Message Date
Loïc Hoguin db50739ad8
CQ: Fix flakes in the store file scan test
We don't expect random bytes to be there in the current
version of the message store as we overwrite empty spaces
with zeroes when moving messages around.

We also don't expect messages to be false flagged when
the broker is running because it checks for message
validity in the index. Therefore make sure message bodies
in the tests don't contain byte 255.
2024-11-14 15:04:49 +01:00
Loïc Hoguin 639e905aea
CQ: Fix shared store scanner missing messages
It was still possible, although rare, to have message store
files lose message data, when the following conditions were
met:

 * the message data contains byte values 255
   (255 is used as an OK marker after a message)
 * the message is located after a 0-filled hole in the file
 * the length of the data is at least 4096 bytes and
   if we misread it (as detailed below) we encounter
   a 255 byte where we expect the OK marker

The trick for the code to previously misread the length can
be explained as follow:

A message is stored in the following format:

  <<Len:64, MsgIdAndMsg:Len/unit:8, 255>>

With MsgId always being 16 bytes in length. So Len is always
at least 16, if the message data Msg is empty. But technically
it never is.

Now if we have a zero filled hole just before this message,
we may end up with this:

  <<0, Len:64, MsgIdAndMsg:Len/unit:8, 255>>

When we are scanning we are testing bytes to see if there is
a message there or not. We look for a Len that gives us byte
255 after MsgIdAndMsg.

Len of value 4096 looks like this in binary:

  <<0:48, 16, 0>>

Problem is if we have leading zeroes, Len may look like this:

  <<0, 0:48, 16, 0>>

If we take the first 64 bits we get a potential length of 16.
We look at the byte after the next 16 bytes. If it is 255, we
think this is a message and skip by this amount of bytes, and
mistakenly miss the real message.

Solving this by changing the file format would be simple enough,
but we don't have the luxury to afford that. A different solution
was found, which is to combine file scanning with checking that
the message exists in the message store index (populated from
queues at startup, and kept up to date over the life time of
the store). Then we know for sure that the message above
doesn't exist, because the MsgId won't be found in the index.
If it is, then the file number and offset will not match,
and the check will fail.

There remains a small chance that we get it wrong during dirty
recovery. Only a better file format would improve that.
2024-10-07 13:23:12 +02:00
Loïc Hoguin 30a8de3287
rabbit tests: Delete some temporary files to reduce log sizes 2024-09-30 12:35:41 +02:00
Loïc Hoguin 49bedfc17e
Remove most of the fd related FHC code
Stats were not removed, including management UI stats
relating to FDs.

Web-MQTT and Web-STOMP configuration relating to FHC
were not removed.

The file_handle_cache itself must be kept until we
remove CQv1.
2024-06-24 12:07:51 +02:00
Loïc Hoguin 1ca46f1c63
CQ: Remove rabbit_memory_monitor and RAM durations
CQs have not used RAM durations for some time, following
the introduction of v2.
2024-06-20 15:19:51 +02:00
Loïc Hoguin 41ce4da5ca
CQ: Remove ability to change shared store index module
It will always use the ETS index. This change lets us
do optimisations that would otherwise not be possible,
including 81b2c39834953d9e1bd28938b7a6e472498fdf13.

A small functional change is included in this commit:
we now always use ets:update_counter to update the
ref_count, instead of a mix of update_{counter,fields}.

When upgrading to 4.0, the index will be rebuilt for
all users that were using a custom index module.
2024-06-14 11:52:03 +02:00
Diana Parra Corbacho 3bbda5bdba Remove classic mirror queues 2024-06-04 13:00:31 +02:00
Loïc Hoguin ecf46002e0
Remove availability of CQv1
We reject CQv1 in rabbit.schema as well.

Most of the v1 code is still around as it is needed
for conversion to v2. It will be removed at a later
time when conversion is no longer supported.

We don't shard the CQ property suite anymore:
there's only 1 case remaining.
2024-05-13 13:06:07 +02:00
Loïc Hoguin fcd011f9cf
CQ: Fix shared store crashes
Crashes could happen because compaction would wrongly write
over valid messages, or truncate over valid messages, because
when looking for messages into the files it would encounter
leftover data that made it look like there was a message,
which prompted compaction to not look for the real messages
hidden within.

To avoid this we ensure that there can't be leftover data
as a result of compaction. We get this guarantee by blanking
data in the holes in the file before we start copying messages
closer to the start of the file. This requires us to do a few
more writes but we know that the only data in the files at any
point are valid messages.

Note that it's possible that some of the messages in the files
are no longer referenced; that's OK. We filter them out after
scanning the file.

This was also a good time to merge two almost identical scan
functions, and be more explicit about what messages should be
dropped after scanning the file (the messages no longer in the
ets index and the fan-out messages that ended up re-written in
a more recent file).
2024-04-26 13:03:34 +02:00
Ayanda Dube 3a16d4d03e remove deprecated queue_explicit_gc_run_operation_threshold config 2024-03-28 15:32:36 +00:00
Loïc Hoguin 74b9811a5c
Tests for shared message store file scanner 2024-03-11 13:17:16 +01:00
Michael Klishin f414c2d512
More missed license header updates #9969 2024-02-05 11:53:50 -05:00
Arnaud Cogoluègnes e22fcd70fe
Make MC conversion function return ok or error 2023-09-18 18:32:59 +02:00
Karl Nilsson 119f034406
Message Containers (#5077)
This PR implements an approach for a "protocol (data format) agnostic core" where the format of the message isn't converted at point of reception.

Currently all non AMQP 0.9.1 originating messages are converted into a AMQP 0.9.1 flavoured basic_message record before sent to a queue. If the messages are then consumed by the originating protocol they are converted back from AMQP 0.9.1. For some protocols such as MQTT 3.1 this isn't too expensive as MQTT is mostly a fairly easily mapped subset of AMQP 0.9.1 but for others such as AMQP 1.0 the conversions are awkward and in some cases lossy even if consuming from the originating protocol.

This PR instead wraps all incoming messages in their originating form into a generic, extensible message container type (mc). The container module exposes an API to get common message details such as size and various properties (ttl, priority etc) directly from the source data type. Each protocol needs to implement the mc behaviour such that when a message originating form one protocol is consumed by another protocol we convert it to the target protocol at that point.

The message container also contains annotations, dead letter records and other meta data we need to record during the lifetime of a message. The original protocol message is never modified unless it is consumed.

This includes conversion modules to and from amqp, amqpl (AMQP 0.9.1) and mqtt.


COMMIT HISTORY:

* Refactor away from using the delivery{} record

In many places including exchange types. This should make it
easier to move towards using a message container type instead of
basic_message.

Add mc module and move direct replies outside of exchange

Lots of changes incl classic queues

Implement stream support incl amqp conversions

simplify mc state record

move mc.erl

mc dlx stuff

recent history exchange

Make tracking work

But doesn't take a protocol agnostic approach as we just convert
everything into AMQP legacy and back. Might be good enough for now.

Tracing as a whole may want a bit of a re-vamp at some point.

tidy

make quorum queue peek work by legacy conversion

dead lettering fixes

dead lettering fixes

CMQ fixes

rabbit_trace type fixes

fixes

fix

Fix classic queue props

test assertion fix

feature flag and backwards compat

Enable message_container feature flag in some SUITEs

Dialyzer fixes

fixes

fix

test fixes

Various

Manually update a gazelle generated file

until a gazelle enhancement can be made
https://github.com/rabbitmq/rules_erlang/issues/185

Add message_containers_SUITE to bazel

and regen bazel files with gazelle from rules_erlang@main

Simplify essential proprty access

Such as durable, ttl and priority by extracting them into annotations
at message container init time.

Move type

to remove dependenc on amqp10 stuff in mc.erl

mostly because I don't know how to make bazel do the right thing

add more stuff

Refine routing header stuff

wip

Cosmetics

Do not use "maybe" as type name as "maybe" is a keyword since OTP 25
which makes Erlang LS complain.

* Dedup death queue names

* Fix function clause crashes

Fix failing tests in the MQTT shared_SUITE:
A classic queue message ID can be undefined as set in
fbe79ff47b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl (L1048)

Fix failing tests in the MQTT shared_SUITE-mixed:
When feature flag message_containers is disabled, the
message is not an #mc{} record, but a #basic_message{} record.

* Fix is_utf8_no_null crash

Prior to this commit, the function crashed if invalid UTF-8 was
provided, e.g.:
```
1> rabbit_misc:is_valid_shortstr(<<"😇"/utf16>>).
** exception error: no function clause matching rabbit_misc:is_utf8_no_null(<<216,61,222,7>>) (rabbit_misc.erl, line 1481)
```

* Implement mqtt mc behaviour

For now via amqp translation.

This is still work in progress, but the following SUITEs pass:
```
make -C deps/rabbitmq_mqtt ct-shared t=[mqtt,v5,cluster_size_1] FULL=1
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_1] FULL=1
```

* Shorten mc file names

Module name length matters because for each persistent message the #mc{}
record is persisted to disk.

```
1> iolist_size(term_to_iovec({mc, rabbit_mc_amqp_legacy})).
30
2> iolist_size(term_to_iovec({mc, mc_amqpl})).
17
```

This commit renames the mc modules:
```
ag -l rabbit_mc_amqp_legacy | xargs sed -i 's/rabbit_mc_amqp_legacy/mc_amqpl/g'
ag -l rabbit_mc_amqp | xargs sed -i 's/rabbit_mc_amqp/mc_amqp/g'
ag -l rabbit_mqtt_mc | xargs sed -i 's/rabbit_mqtt_mc/mc_mqtt/g'
```

* mc: make deaths an annotation + fixes

* Fix mc_mqtt protocol_state callback

* Fix test will_delay_node_restart

```
make -C deps/rabbitmq_mqtt ct-v5 t=[mqtt,cluster_size_3]:will_delay_node_restart FULL=1
```

* Bazel run gazelle

* mix format rabbitmqctl.ex

* Ensure ttl annotation is refelected in amqp legacy protocol state

* Fix id access in message store

* Fix rabbit_message_interceptor_SUITE

* dializer fixes

* Fix rabbit:rabbit_message_interceptor_SUITE-mixed

set_annotation/3 should not result in duplicate keys

* Fix MQTT shared_SUITE-mixed

Up to 3.12 non-MQTT publishes were always QoS 1 regardless of delivery_mode.
75a953ce28/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl (L2075-L2076)
From now on, non-MQTT publishes are QoS 1 if durable.
This makes more sense.

The MQTT plugin must send a #basic_message{} to an old node that does
not understand message containers.

* Field content of 'v1_0.data' can be binary

Fix
```
bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed \
    --test_env FOCUS="-group [mqtt,v4,cluster_size_1] -case trace" \
    -t- --test_sharding_strategy=disabled
```

* Remove route/2 and implement route/3 for all exchange types.

This removes the route/2 callback from rabbit_exchange_type and
makes route/3 mandatory instead. This is a breaking change and
will require all implementations of exchange types to update their
code, however this is necessary anyway for them to correctly handle
the mc type.

stream filtering fixes

* Translate directly from MQTT to AMQP 0.9.1

* handle undecoded properties in mc_compat

amqpl: put clause in right order

recover death deatails from amqp data

* Replace callback init_amqp with convert_from

* Fix return value of lists:keyfind/3

* Translate directly from AMQP 0.9.1 to MQTT

* Fix MQTT payload size

MQTT payload can be a list when converted from AMQP 0.9.1 for example

First conversions tests

Plus some other conversion related fixes.

bazel

bazel

translate amqp 1.0 null to undefined

mc: property/2 and correlation_id/message_id return type tagged values.

To ensure we can support a variety of types better.

The type type tags are AMQP 1.0 flavoured.

fix death recovery

mc_mqtt: impl new api

Add callbacks to allow protocols to compact data before storage

And make readable if needing to query things repeatedly.

bazel fix

* more decoding

* tracking mixed versions compat

* mc: flip default of `durable` annotation to save some data.

Assuming most messages are durable and that in memory messages suffer less
from persistence overhead it makes sense for a non existent `durable`
annotation to mean durable=true.

* mc conversion tests and tidy up

* mc make x_header unstrict again

* amqpl: death record fixes

* bazel

* amqp -> amqpl conversion test

* Fix crash in mc_amqp:size/1

Body can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0/ ct-system t=java
```
on branch native-amqp.

* Fix crash in lists:flatten/1

Data can be a single amqp-value section (instead of
being a list) as shown by test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp.

* Fix crash in rabbit_writer

Running test
```
make -C deps/rabbitmq_amqp1_0 ct-system t=dotnet:roundtrip_to_amqp_091
```
on branch native-amqp resulted in the following crash:
```
crasher:
  initial call: rabbit_writer:enter_mainloop/2
  pid: <0.711.0>
  registered_name: []
  exception error: bad argument
    in function  size/1
       called as size([<<0>>,<<"Sw">>,[<<160,2>>,<<"hi">>]])
       *** argument 1: not tuple or binary
    in call from rabbit_binary_generator:build_content_frames/7 (rabbit_binary_generator.erl, line 89)
    in call from rabbit_binary_generator:build_simple_content_frames/4 (rabbit_binary_generator.erl, line 61)
    in call from rabbit_writer:assemble_frames/5 (rabbit_writer.erl, line 334)
    in call from rabbit_writer:internal_send_command_async/3 (rabbit_writer.erl, line 365)
    in call from rabbit_writer:handle_message/2 (rabbit_writer.erl, line 265)
    in call from rabbit_writer:handle_message/3 (rabbit_writer.erl, line 232)
    in call from rabbit_writer:mainloop1/2 (rabbit_writer.erl, line 223)
```
because #content.payload_fragments_rev is currently supposed to
be a flat list of binaries instead of being an iolist.

This commit fixes this crash inefficiently by calling
iolist_to_binary/1. A better solution would be to allow AMQP legacy's #content.payload_fragments_rev
to be an iolist.

* Add accidentally deleted line back

* mc: optimise mc_amqp internal format

By removint the outer records for message and delivery annotations
as well as application properties and footers.

* mc: optimis mc_amqp map_add by using upsert

* mc: refactoring and bug fixes

* mc_SUITE routingheader assertions

* mc remove serialize/1 callback as only used by amqp

* mc_amqp: avoid returning a nested list from protocol_state

* test and bug fix

* move infer_type to mc_util

* mc fixes and additiona assertions

* Support headers exchange routing for MQTT messages

When a headers exchange is bound to the MQTT topic exchange, routing
will be performend based on both MQTT topic (by the topic exchange) and
MQTT User Property (by the headers exchange).

This combines the best worlds of both MQTT 5.0 and AMQP 0.9.1 and
enables powerful routing topologies.

When the User Property contains the same name multiple times, only the
last name (and value) will be considered by the headers exchange.

* Fix crash when sending from stream to amqpl

When publishing a message via the stream protocol and consuming it via
AMQP 0.9.1, the following crash occurred prior to this commit:
```
crasher:
  initial call: rabbit_channel:init/1
  pid: <0.818.0>
  registered_name: []
  exception exit: {{badmatch,undefined},
                   [{rabbit_channel,handle_deliver0,4,
                                    [{file,"rabbit_channel.erl"},
                                     {line,2728}]},
                    {lists,foldl,3,[{file,"lists.erl"},{line,1594}]},
                    {rabbit_channel,handle_cast,2,
                                    [{file,"rabbit_channel.erl"},
                                     {line,728}]},
                    {gen_server2,handle_msg,2,
                                 [{file,"gen_server2.erl"},{line,1056}]},
                    {proc_lib,wake_up,3,
                              [{file,"proc_lib.erl"},{line,251}]}]}
```

This commit first gives `mc:init/3` the chance to set exchange and
routing_keys annotations.
If not set, `rabbit_stream_queue` will set these annotations assuming
the message was originally published via the stream protocol.

* Support consistent hash exchange routing for MQTT 5.0

When a consistent hash exchange is bound to the MQTT topic exchange,
MQTT 5.0 messages can be routed to queues consistently based on the
Correlation-Data in the PUBLISH packet.

* Convert MQTT 5.0 User Property

* to AMQP 0.9.1 headers
* from AMQP 0.9.1 headers
* to AMQP 1.0 application properties and message annotations
* from AMQP 1.0 application properties and message annotations

* Make use of Annotations in mc_mqtt:protocol_state/2

mc_mqtt:protocol_state/2 includes Annotations as parameter.
It's cleaner to make use of these Annotations when computing the
protocol state instead of relying on the caller (rabbitmq_mqtt_processor)
to compute the protocol state.

* Enforce AMQP 0.9.1 field name length limit

The AMQP 0.9.1 spec prohibits field names longer than 128 characters.
Therefore, when converting AMQP 1.0 message annotations, application
properties or MQTT 5.0 User Property to AMQP 0.9.1 headers, drop any
names longer than 128 characters.

* Fix type specs

Apply feedback from Michael Davis

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* Add mc_mqtt unit test suite

Implement mc_mqtt:x_header/2

* Translate indicator that payload is UTF-8 encoded

when converting between MQTT 5.0 and AMQP 1.0

* Translate single amqp-value section from AMQP 1.0 to MQTT

Convert to a text representation, if possible, and indicate to MQTT
client that the payload is UTF-8 encoded. This way, the MQTT client will
be able to parse the payload.

If conversion to text representation is not possible, encode the payload
using the AMQP 1.0 type system and indiate the encoding via Content-Type
message/vnd.rabbitmq.amqp.

This Content-Type is not registered.
Type "message" makes sense since it's a message.
Vendor tree "vnd.rabbitmq.amqp" makes sense since merely subtype "amqp" is not
registered.

* Fix payload conversion

* Translate Response Topic between MQTT and AMQP

Translate MQTT 5.0 Response Topic to AMQP 1.0 reply-to address and vice
versa.

The Response Topic must be a UTF-8 encoded string.

This commit re-uses the already defined RabbitMQ target addresses:
```
"/topic/"     RK        Publish to amq.topic with routing key RK
"/exchange/"  X "/" RK  Publish to exchange X with routing key RK
```

By default, the MQTT topic exchange is configure dto be amq.topic using
the 1st target address.

When an operator modifies the mqtt.exchange, the 2nd target address is
used.

* Apply PR feedback

and fix formatting

Co-authored-by: Michael Davis <mcarsondavis@gmail.com>

* tidy up

* Add MQTT message_containers test

* consistent hash exchange: avoid amqp legacy conversion

When hashing on a header value.

* Avoid converting to amqp legacy when using exchange federation

* Fix test flake

* test and dialyzer fixes

* dialyzer fix

* Add MQTT protocol interoperability tests

Test receiving from and sending to MQTT 5.0 and
* AMQP 0.9.1
* AMQP 1.0
* STOMP
* Streams

* Regenerate portions of deps/rabbit/app.bzl with gazelle

I'm not exactly sure how this happened, but gazell seems to have been
run with an older version of the rules_erlang gazelle extension at
some point. This caused generation of a structure that is no longer
used. This commit updates the structure to the current pattern.

* mc: refactoring

* mc_amqpl: handle delivery annotations

Just in case they are included.

Also use iolist_to_iovec to create flat list of binaries when
converting from amqp with amqp encoded payload.

---------

Co-authored-by: David Ansari <david.ansari@gmx.de>
Co-authored-by: Michael Davis <mcarsondavis@gmail.com>
Co-authored-by: Rin Kuryloski <kuryloskip@vmware.com>
2023-08-31 11:27:13 +01:00
Loïc Hoguin 985f4e8a68
CQ shared store write optimisations (#8507)
* CQ: Don't use FHC for writes in shared store

* CQ: Send confirms when flushing to disk in shared store

Before they would only be sent periodically or when
rolling over to a new file.

* CQ: Fast-confirm when flushing data to disk

We know the messages are on disk or were acked so there is no
need to do sets intersections/subtracts in this scenario.

* Fix a Dialyzer warning

* Faster confirms for unwritten messages

Instead of having the message store send a message to the queue
with the confirms for messages ignored due to the flying
optimisation, we have the queue handle the confirms directly
when removing the messages.

This avoids sending potentially 1 Erlang message per 1 AMQP
message to the queue.

* Refactor rabbit_msg_file:pread into rabbit_msg_store

Also make use of the opened file for multi-reads instead
of opening/reading/closing each time.

* CQ: Make sure we keep the updated CState when using read_many

* CQ shared store: Run compaction on older file candidates

The way I initially did this the maybe_gc would be triggered
based on candidates from 15s ago, but run against candidates
from just now. This is sub-optimal because when messages are
consumed rapidly, just-now candidates are likely to be in a
file about to be deleted, and we don't want to run compaction
on those.

Instead, when sending the maybe_gc we also send the candidates
we had at the time. Then 15s later we check if the file still
exists. If it's gone, great! No compaction to do.

* CQ: Add a few todos for later
2023-06-20 20:04:17 +02:00
Loïc Hoguin ef7c68a9cc
CQ shared store: rework the flying optimisation
Instead of doing a complicated +1/-1 we do an update_counter
of an integer value using 2^n values. We always know exactly
in which state we are when looking at the ets table. We also
can avoid some ets operations as a result although the
performance improvements are minimal.
2023-05-30 11:19:46 +02:00
Loïc Hoguin 4e4e6e401a
CQ: Remove mechanism for closing FHC FDs in queues
We no longer use FHC there and don't keep FDs open
after reading.
2023-05-30 11:19:45 +02:00
Michal Kuratczyk f8a3643d5d
Remove "lazy" from Management and lazy-specific tests 2023-05-18 13:59:50 +02:00
Rin Kuryloski c61d16c971 Include the queue type in the queue_deleted rabbit_event
This is useful for understanding if a deleted queue was matching any
policies given the more selective policies introduced in #7601.

Does not apply to bulk deletion of transient queues on node down.
2023-03-17 11:50:14 +01:00
Loïc Hoguin e330f683b6
CQ: Fix performance regression after moving to v2 sets
sets:from_list also must be told to use v2 otherwise
it will use v1.
2023-01-31 15:37:47 +01:00
David Ansari 8a2a82e19b Remove feature flag no_queue_name_in_classic_queue_client
as it was unnecessary to introduce it in the first place.

Remove the queue name from all queue type clients and pass the queue
name to the queue type callbacks that need it.

We have to leave feature flag classic_queue_type_delivery_support
required because we removed the monitor registry
1fd4a6d353/deps/rabbit/src/rabbit_queue_type.erl (L322-L325)

Implements review from Karl:
"rather than changing the message format we could amend the queue type
callbacks involved with the stateful operation to also take the queue
name record as an argument. This way we don't need to maintain the extra
queue name (which uses memory for known but obscurely technical reasons
with how maps work) in the queue type state (as it is used in the queue
type state map as the key)"
2023-01-24 17:32:59 +00:00
David Ansari af68fb4484 Decrease memory usage of queue_type state
Prior to this commit, 1 MQTT publisher publishing to 1 Million target
classic queues requires around 680 MB of process memory.

After this commit, it requires around 290 MB of process memory.

This commit requires feature flag classic_queue_type_delivery_support
and introduces a new one called no_queue_name_in_classic_queue_client.

Instead of storing the binary queue name 4 times, this commit now stores
it only 1 time.

The monitor_registry is removed since only classic queue clients monitor
their classic queue server processes.

The classic queue client does not store the queue name anymore. Instead
the queue name is included in messages handled by the classic queue
client.

Storing the queue name in the record ctx was unnecessary.

More potential future memory optimisations:
* When routing to destination queues, looking up the queue record,
  delivering to queue: Use streaming / batching instead of fetching all
  at once
* Only fetch ETS columns that are necessary instead of whole queue
  records
* Do not hold the same vhost binary in memory many times. Instead,
  maintain a mapping.
* Remove unnecessary tuple fields.
2023-01-24 17:29:07 +00:00
Michael Klishin ec4f1dba7d
(c) year bump: 2022 => 2023 2023-01-01 23:17:36 -05:00
Luke Bakken 7fe159edef
Yolo-replace format strings
Replaces `~s` and `~p` with their unicode-friendly counterparts.

```
git ls-files *.erl | xargs sed -i.ORIG -e s/~s>/~ts/g -e s/~p>/~tp/g
```
2022-10-10 10:32:03 +04:00
Loïc Hoguin f59020191b
CQ: Enable borken checks in backing_queue_SUITE again 2022-09-27 12:00:09 +02:00
Loïc Hoguin 723cc54705
CQ: Some cleanup 2022-09-27 12:00:09 +02:00
Loïc Hoguin 3683ab9a6e
CQ: Use v2 sets instead of gb_sets for confirms
For the following flags I see an improvement of
30k/s to 34k/s on my machine:

-x 1 -y 1 -A 1000 -q 1000 -c 1000 -s 1000 -f persistent
-u cqv2 --queue-args=x-queue-version=2
2022-09-27 12:00:08 +02:00
Loïc Hoguin 38f335e83b
Rework CQ stats code to be obvious and efficient 2022-09-27 12:00:08 +02:00
Loïc Hoguin 341e908bbf
CQ: Merge lazy/default behavior into a unified mode
No longer reduce memory usage as well (except an explicit GC
that I am pondering about removing).
2022-09-27 12:00:03 +02:00
Michael Klishin c38a3d697d
Bump (c) year 2022-03-21 01:21:56 +04:00
Loïc Hoguin 7198d4720b
Make backing_queue_SUITE fast on macOS
This very small patch requires extended explanations. The patch
swaps two lines in a rabbit_variable_queue setup: one which sets
the memory hint to 0 which results in reduce_memory_usage to
always flush to disk and fsync; and another which publishes a
lot of messages to the queue that will after that point be
manipulated further to get the queue in the exact right state
for the relevant tests.

The problem with calling reduce_memory_usage after every single
message has been published is not writing to disk (v2 tests do
not suffer from performance issues in that regard) but rather
that rabbit_queue_index will always flush its journal (containing
the one message), which results in opening the segment file,
appending to it, and closing it. The file handling is done
by file_handle_cache which, in this case, will always fsync
the data before closing the file. And that's this one fsync
per message that makes the relevant tests very slow.

By swapping the lines, meaning we publish all messages first
and then set the memory hint to 0, we end up with a single
reduce_memory_usage call that results in an fsync, at the
end. (There may be other fsyncs as part of normal operations.)
And still get the same result because all messages will have
been flushed to disk, only this time in far fewer operations.

This doesn't seem to have been causing problems on CI which
already runs the tests very fast but should help macOS and
possibly other development environments.
2022-03-18 13:26:57 +01:00
Luke Bakken c352525e0c
Rename `variable_queue_default_version` to `classic_queue_default_version` 2022-01-25 11:23:23 +01:00
Loïc Hoguin 44fd112e6d
Implement resuming v2->v1 conversion during dirty recovery 2022-01-25 11:23:16 +01:00
Loïc Hoguin 92d95bf5c7
Fix unused vars warning in classic_queue_SUITE 2022-01-25 11:23:14 +01:00
Loïc Hoguin 469788a820
Ensure index files with holes get removed
On dirty recovery the count in the segment file was already
accurate. It was not accurate otherwise as it assumed that
all messages would be written to the index, which is not
the case in the current implementation.
2022-01-25 11:23:14 +01:00
Loïc Hoguin 467309418c
Reenable some checks in backing_queue_SUITE 2022-01-25 11:23:13 +01:00
Loïc Hoguin 9f15f86252
CQ version switch via policies + proper test for this 2022-01-25 11:23:10 +01:00
Loïc Hoguin 3fc1eb14de
Fix remaining tests for CQ v1 2022-01-25 11:23:09 +01:00
Loïc Hoguin c4672b6f2c
Test both indexes 2022-01-25 11:23:09 +01:00
Loïc Hoguin 6dfe6a7be8
Test both CQ v1 and v2 2022-01-25 11:23:09 +01:00
Loïc Hoguin ad67f787ab
Reenable embed 0/1024 groups and fix embed 0 recovery 2022-01-25 11:23:06 +01:00
Loïc Hoguin 2473ff7328
Reenable some tests that were commented out 2022-01-25 11:23:05 +01:00
Loïc Hoguin b0b9b46313
Fix remaining tests 2022-01-25 11:23:04 +01:00
Loïc Hoguin c02de4d252
Some cleanup and fix most tests
Still need to improve recovery and do some sort of check in the
store so we know the file isn't corrupted.
2022-01-25 11:23:04 +01:00
Loïc Hoguin fc9846d01d
Fix obvious mistakes in previous commit 2022-01-25 11:23:04 +01:00
Loïc Hoguin 33fada8847
Track delivers per-queue rather than per-message
Because queues deliver messages sequentially we do not need to
keep track of delivers per message, we just need to keep track
of the highest message that was delivered, via its seq_id().

This allows us to avoid updating the index and storing data
unnecessarily and can help simplify the code (not seen in this
WIP commit because the code was left there or commented out
for the time being).

Includes a few small bug fixes.
2022-01-25 11:23:03 +01:00
Loïc Hoguin cf080b9937
Add file_handle_cache FD reservations 2022-01-25 11:23:02 +01:00
Loïc Hoguin 0102191e2b
Rename to rabbit_classic_queue_index_v2 2022-01-25 11:23:00 +01:00
Loïc Hoguin 0f431876f2
No longer tests with different embed settings
Since messages are no longer embedded those settings are ignored.
2022-01-25 11:22:58 +01:00
Loïc Hoguin 98f64f2fa8
Replace classic queue index with a modern implementation 2022-01-25 11:22:56 +01:00