Handle mc_amqp 3.13 `msg` record in 4.x

The `msg` record was used in 3.13. This commit makes 4.x understand
this record for backward compatibility, specifically for the rare case where:
1. a 3.13 node internally parsed a message from a stream via
```
Message = mc:init(mc_amqp, amqp10_framing:decode_bin(Bin), #{})
```
2. published this Message to a queue
3. RabbitMQ got upgraded to 4.x

(This commit can be reverted in some future RabbitMQ version once it's
safe to assume that these upgraded messages have been consumed.)

The changes were manually tested as described in Jira RMQ-1525.
This commit is contained in:
David Ansari 2025-02-27 10:16:34 +01:00
parent cdc042a2fd
commit 91f5ce2544
1 changed files with 76 additions and 6 deletions

View File

@ -50,6 +50,29 @@
Val :: term()}].
-type opt(T) :: T | undefined.
%% This representation was used in v3.13.7. 4.x understands this record for
%% backward compatibility, specifically for the rare case where:
%% 1. a 3.13 node internally parsed a message from a stream via
%% ```
%% Message = mc:init(mc_amqp, amqp10_framing:decode_bin(Bin), #{})
%% ```
%% 2. published this Message to a queue
%% 3. RabbitMQ got upgraded to 4.x
%%
%% This record along with all its conversions in this module can therefore
%% be deleted in some future RabbitMQ version once it's safe to assume that
%% these upgraded messages have all been consumed.
-record(msg,
{
header :: opt(#'v1_0.header'{}),
delivery_annotations = []:: list(),
message_annotations = [] :: list(),
properties :: opt(#'v1_0.properties'{}),
application_properties = [] :: list(),
data = [] :: amqp10_data(),
footer = [] :: list()
}).
%% This representation is used when the message was originally sent with
%% a protocol other than AMQP and the message was not read from a stream.
-record(msg_body_decoded,
@ -97,7 +120,7 @@
body_code :: body_descriptor_code()
}).
-opaque state() :: #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.
-opaque state() :: #msg{} | #msg_body_decoded{} | #msg_body_encoded{} | #v1{}.
-export_type([state/0]).
@ -128,6 +151,8 @@ convert_from(?MODULE, Sections, _Env) when is_list(Sections) ->
convert_from(_SourceProto, _, _Env) ->
not_implemented.
convert_to(?MODULE, Msg = #msg{}, _Env) ->
convert_from_3_13_msg(Msg);
convert_to(?MODULE, Msg, _Env) ->
Msg;
convert_to(TargetProto, Msg, Env) ->
@ -139,7 +164,22 @@ size(#v1{message_annotations = MA,
[] -> 0;
_ -> ?MESSAGE_ANNOTATIONS_GUESS_SIZE
end,
{MetaSize, byte_size(Body)}.
{MetaSize, byte_size(Body)};
%% Copied from v3.13.7.
%% This might be called in rabbit_fifo_v3 and must therefore not be modified
%% to ensure determinism of quorum queues version 3.
size(#msg{data = Body}) ->
BodySize = if is_list(Body) ->
lists:foldl(
fun(#'v1_0.data'{content = Data}, Acc) ->
iolist_size(Data) + Acc;
(#'v1_0.amqp_sequence'{content = _}, Acc) ->
Acc
end, 0, Body);
is_record(Body, 'v1_0.amqp_value') ->
0
end,
{_MetaSize = 0, BodySize}.
x_header(Key, Msg) ->
message_annotation(Key, Msg, undefined).
@ -151,6 +191,10 @@ property(_Prop, #msg_body_encoded{properties = undefined}) ->
undefined;
property(Prop, #msg_body_encoded{properties = Props}) ->
property0(Prop, Props);
property(_Prop, #msg{properties = undefined}) ->
undefined;
property(Prop, #msg{properties = Props}) ->
property0(Prop, Props);
property(_Prop, #v1{bare_and_footer_properties_pos = ?OMITTED_SECTION}) ->
undefined;
property(Prop, #v1{bare_and_footer = Bin,
@ -298,7 +342,9 @@ protocol_state(#v1{message_annotations = MA0,
ttl = Ttl}, Anns),
MA = protocol_state_message_annotations(MA0, Anns),
Sections = to_sections(Header, MA, []),
[encode(Sections), BareAndFooter].
[encode(Sections), BareAndFooter];
protocol_state(#msg{} = Msg, Anns) ->
protocol_state(convert_from_3_13_msg(Msg), Anns).
prepare(read, Msg) ->
Msg;
@ -322,7 +368,9 @@ prepare(store, #msg_body_encoded{
bare_and_footer_application_properties_pos = AppPropsPos,
bare_and_footer_body_pos = BodyPos,
body_code = BodyCode
}.
};
prepare(store, Msg = #msg{}) ->
Msg.
%% internal
@ -379,7 +427,9 @@ msg_to_sections(#v1{message_annotations = MAC,
Sections = amqp10_framing:decode_bin(Bin),
Sections ++ [{amqp_encoded_body_and_footer, BodyAndFooterBin}]
end,
to_sections(undefined, MAC, Tail).
to_sections(undefined, MAC, Tail);
msg_to_sections(#msg{} = Msg) ->
msg_to_sections(convert_from_3_13_msg(Msg)).
to_sections(H, MAC, P, APC, Tail) ->
S0 = case APC of
@ -410,6 +460,20 @@ to_sections(H, MAC, Tail) ->
[H | S]
end.
convert_from_3_13_msg(#msg{header = H,
delivery_annotations = _,
message_annotations = MAC,
properties = P,
application_properties = APC,
data = Data,
footer = FC}) ->
#msg_body_decoded{header = H,
message_annotations = MAC,
properties = P,
application_properties = APC,
data = Data,
footer = FC}.
-spec protocol_state_message_annotations(amqp_annotations(), mc:annotations()) ->
amqp_annotations().
protocol_state_message_annotations(MA, Anns) ->
@ -482,11 +546,14 @@ message_annotation(Key, State, Default)
message_annotations(#msg_body_decoded{message_annotations = L}) -> L;
message_annotations(#msg_body_encoded{message_annotations = L}) -> L;
message_annotations(#v1{message_annotations = L}) -> L.
message_annotations(#v1{message_annotations = L}) -> L;
message_annotations(#msg{message_annotations = L}) -> L.
message_annotations_as_simple_map(#msg_body_encoded{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content);
message_annotations_as_simple_map(#v1{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content);
message_annotations_as_simple_map(#msg{message_annotations = Content}) ->
message_annotations_as_simple_map0(Content).
message_annotations_as_simple_map0(Content) ->
@ -501,6 +568,9 @@ message_annotations_as_simple_map0(Content) ->
application_properties_as_simple_map(
#msg_body_encoded{application_properties = Content}, L) ->
application_properties_as_simple_map0(Content, L);
application_properties_as_simple_map(
#msg{application_properties = Content}, L) ->
application_properties_as_simple_map0(Content, L);
application_properties_as_simple_map(
#v1{bare_and_footer_application_properties_pos = ?OMITTED_SECTION}, L) ->
L;