Introduce feature flag message_containers_store_amqp_v1

Prior to this commit test case
```
bazel test //deps/rabbit:amqp_client_SUITE-mixed -t- \
    --test_sharding_strategy=disabled --test_env \
    FOCUS="-group [cluster_size_3] -case quorum_queue_on_old_node"
```
was failing because `mc_amqp:size(#v1{})` was called on the old node
which doesn't understand the new AMQP on disk message format.

Even though the old 3.13 node never stored mc_amqp messages in classic
or quorum queues,
1. it will either need to understand the new mc_amqp message format, or
2. we should prevent the new format being sent to 3.13. nodes.

In this commit we decide for the 2nd solution.
In `mc:prepare(store, Msg)`, we convert the new mc_amqp format to
mc_amqpl which is guaranteed to be understood by the old node.
Note that `mc:prepare(store, Msg)` is not only stored before actual
storage, but already before the message is sent to the queue process
(which might be hosted by the old node).
The 2nd solution is easier to reason about over the 1st solution
because:
a) We don't have to backport code meant to be for 4.0 to 3.13, and
b) 3.13 is guaranteed to never store mc_amqp messages in classic or
   quorum queues, even in mixed version clusters.

The disadvantage of the 2nd solution is that messages are converted from
mc_amqp to mc_amqpl and back to mc_amqp if there is an AMQP sender and
AMQP receiver. However, this only happens while the new feature flag
is disabled during the rolling upgrade. In a certain sense, this is a
hybrid to how the AMQP 1.0 plugin worked in 3.13: Even though we don't
proxy via AMQP 0.9.1 anymore, we still convert to AMQP 0.9.1 (mc_amqpl)
messages when feature flag message_containers_store_amqp_v1 is disabled.
This commit is contained in:
David Ansari 2024-04-21 12:33:04 +02:00
parent fc7f458f7c
commit 0b51b8d39b
2 changed files with 15 additions and 0 deletions

View File

@ -440,6 +440,14 @@ last_death(BasicMsg) ->
mc_compat:last_death(BasicMsg).
-spec prepare(read | store, state()) -> state().
prepare(store, #?MODULE{protocol = mc_amqp} = State) ->
case rabbit_feature_flags:is_enabled(message_containers_store_amqp_v1) of
true ->
State#?MODULE{data = mc_amqp:prepare(store, State#?MODULE.data)};
false ->
State1 = convert(mc_amqpl, State),
State1#?MODULE{data = mc_amqpl:prepare(store, State1#?MODULE.data)}
end;
prepare(For, #?MODULE{protocol = Proto,
data = Data} = State) ->
State#?MODULE{data = Proto:prepare(For, Data)};

View File

@ -170,3 +170,10 @@
#{desc => "Credit API v2 between queue clients and queue processes",
stability => stable
}}).
-rabbit_feature_flag(
{message_containers_store_amqp_v1,
#{desc => "Support storing messages in message containers AMQP 1.0 disk format v1",
stability => stable,
depends_on => [message_containers]
}}).