diff --git a/deps/rabbit/src/mc_amqp.erl b/deps/rabbit/src/mc_amqp.erl index 526b1d0299..9c795ebe6c 100644 --- a/deps/rabbit/src/mc_amqp.erl +++ b/deps/rabbit/src/mc_amqp.erl @@ -385,13 +385,16 @@ is_empty(#'v1_0.footer'{content = []}) -> is_empty(_) -> false. -message_annotation(_Key, #msg_body_encoded{message_annotations = []}, - Default) -> - Default; -message_annotation(Key, #msg_body_encoded{message_annotations = Content}, - Default) +message_annotation(Key, State, Default) when is_binary(Key) -> - mc_util:amqp_map_get(Key, Content, Default). + MA = case State of + #msg_body_decoded{message_annotations = MA0} -> MA0; + #msg_body_encoded{message_annotations = MA0} -> MA0 + end, + case MA of + [] -> Default; + _ -> mc_util:amqp_map_get(Key, MA, Default) + end. message_annotations_as_simple_map(#msg_body_encoded{message_annotations = []}) -> []; diff --git a/deps/rabbit/src/mc_amqpl.erl b/deps/rabbit/src/mc_amqpl.erl index 72fa326d14..197e40b568 100644 --- a/deps/rabbit/src/mc_amqpl.erl +++ b/deps/rabbit/src/mc_amqpl.erl @@ -443,29 +443,34 @@ protocol_state(#content{properties = #'P_basic'{headers = H00, Priority = case Priority0 of undefined -> case Anns of - %% This branch is hit when a message with priority was originally - %% published with AMQP to a classic or quorum queue because the - %% AMQP header isn't stored on disk. - #{?ANN_PRIORITY := P} -> P; - _ -> undefined + #{?ANN_PRIORITY := P} -> + %% This branch is hit when a message with priority was originally + %% published with AMQP to a classic or quorum queue because the + %% AMQP header isn't stored on disk. + P; + _ -> + undefined end; _ -> Priority0 end, - DeliveryMode = case DeliveryMode0 of - undefined -> - %% This branch is hit when a message was originally published with - %% AMQP to a classic or quorum queue because the AMQP header isn't - %% stored on disk. - case Anns of - #{?ANN_DURABLE := false} -> 1; - _ -> 2 - end; - _ -> - DeliveryMode0 - end, + DelMode = case DeliveryMode0 of + undefined -> + case Anns of + #{?ANN_DURABLE := false} -> + %% Leave it undefined which is equivalent to 1. + undefined; + _ -> + %% This branch is hit when a durable message was originally published + %% with AMQP to a classic or quorum queue because the AMQP header isn't + %% stored on disk. + 2 + end; + _ -> + DeliveryMode0 + end, B = B0#'P_basic'{headers = Headers, - delivery_mode = DeliveryMode, + delivery_mode = DelMode, priority = Priority, expiration = Expiration, timestamp = Timestamp}, diff --git a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl index 9ab578bd10..9641501468 100644 --- a/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl +++ b/deps/rabbit/test/rabbit_message_interceptor_SUITE.erl @@ -66,10 +66,10 @@ headers_no_overwrite(Config) -> headers(Overwrite, Config) -> Server = atom_to_binary(rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename)), Payload = QName = atom_to_binary(?FUNCTION_NAME), - NowSecs = os:system_time(second), - NowMs = os:system_time(millisecond), Ch = rabbit_ct_client_helpers:open_channel(Config), #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName}), + NowSecs = os:system_time(second), + NowMs = os:system_time(millisecond), amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload}), AssertHeaders =