Fix MQTT -> Stream

and preserve original delivery_mode field
i.e. leave it undefined if it was sent as undefined
This commit is contained in:
David Ansari 2024-04-16 13:06:39 +02:00
parent 312d2af806
commit 2ee246c3b4
3 changed files with 34 additions and 26 deletions

View File

@ -385,13 +385,16 @@ is_empty(#'v1_0.footer'{content = []}) ->
is_empty(_) ->
false.
message_annotation(_Key, #msg_body_encoded{message_annotations = []},
Default) ->
Default;
message_annotation(Key, #msg_body_encoded{message_annotations = Content},
Default)
message_annotation(Key, State, Default)
when is_binary(Key) ->
mc_util:amqp_map_get(Key, Content, Default).
MA = case State of
#msg_body_decoded{message_annotations = MA0} -> MA0;
#msg_body_encoded{message_annotations = MA0} -> MA0
end,
case MA of
[] -> Default;
_ -> mc_util:amqp_map_get(Key, MA, Default)
end.
message_annotations_as_simple_map(#msg_body_encoded{message_annotations = []}) ->
[];

View File

@ -443,29 +443,34 @@ protocol_state(#content{properties = #'P_basic'{headers = H00,
Priority = case Priority0 of
undefined ->
case Anns of
%% This branch is hit when a message with priority was originally
%% published with AMQP to a classic or quorum queue because the
%% AMQP header isn't stored on disk.
#{?ANN_PRIORITY := P} -> P;
_ -> undefined
#{?ANN_PRIORITY := P} ->
%% This branch is hit when a message with priority was originally
%% published with AMQP to a classic or quorum queue because the
%% AMQP header isn't stored on disk.
P;
_ ->
undefined
end;
_ ->
Priority0
end,
DeliveryMode = case DeliveryMode0 of
undefined ->
%% This branch is hit when a message was originally published with
%% AMQP to a classic or quorum queue because the AMQP header isn't
%% stored on disk.
case Anns of
#{?ANN_DURABLE := false} -> 1;
_ -> 2
end;
_ ->
DeliveryMode0
end,
DelMode = case DeliveryMode0 of
undefined ->
case Anns of
#{?ANN_DURABLE := false} ->
%% Leave it undefined which is equivalent to 1.
undefined;
_ ->
%% This branch is hit when a durable message was originally published
%% with AMQP to a classic or quorum queue because the AMQP header isn't
%% stored on disk.
2
end;
_ ->
DeliveryMode0
end,
B = B0#'P_basic'{headers = Headers,
delivery_mode = DeliveryMode,
delivery_mode = DelMode,
priority = Priority,
expiration = Expiration,
timestamp = Timestamp},

View File

@ -66,10 +66,10 @@ headers_no_overwrite(Config) ->
headers(Overwrite, Config) ->
Server = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)),
Payload = QName = atom_to_binary(?FUNCTION_NAME),
NowSecs = os:system_time(second),
NowMs = os:system_time(millisecond),
Ch = rabbit_ct_client_helpers:open_channel(Config),
#'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}),
NowSecs = os:system_time(second),
NowMs = os:system_time(millisecond),
amqp_channel:call(Ch, #'basic.publish'{routing_key = QName},
#amqp_msg{payload = Payload}),
AssertHeaders =