Intercept outgoing just before conversion

Intercept outgoing message just before conversion to target protocol as
this will give most flexibility to 3rd party plugins.
This commit is contained in:
David Ansari 2025-04-22 10:41:56 +02:00 committed by David Ansari
parent f447e84e93
commit 77e73deede
3 changed files with 8 additions and 8 deletions

View File

@ -2180,9 +2180,9 @@ handle_deliver(ConsumerTag, AckRequired,
delivery_tag = {binary, Dtag},
message_format = ?UINT(?MESSAGE_FORMAT),
settled = SendSettled},
Mc1 = mc:convert(mc_amqp, Mc0),
Mc2 = mc:set_annotation(redelivered, Redelivered, Mc1),
Mc = rabbit_msg_interceptor:intercept_outgoing(Mc2, MsgIcptCtx),
Mc1 = rabbit_msg_interceptor:intercept_outgoing(Mc0, MsgIcptCtx),
Mc2 = mc:convert(mc_amqp, Mc1),
Mc = mc:set_annotation(redelivered, Redelivered, Mc2),
Sections = mc:protocol_state(Mc),
validate_message_size(Sections, MaxMessageSize),
Frames = transfer_frames(Transfer, Sections, MaxFrameSize),

View File

@ -2645,8 +2645,8 @@ handle_basic_get(WriterPid, DeliveryTag, NoAck, MessageCount,
{noreply, record_sent(get, QueueType, DeliveryTag, not(NoAck), Msg0, State)}.
outgoing_content(Mc, MsgIcptCtx) ->
Mc1 = mc:convert(mc_amqpl, Mc),
Mc2 = rabbit_msg_interceptor:intercept_outgoing(Mc1, MsgIcptCtx),
Mc1 = rabbit_msg_interceptor:intercept_outgoing(Mc, MsgIcptCtx),
Mc2 = mc:convert(mc_amqpl, Mc1),
mc:protocol_state(Mc2).
init_tick_timer(State = #ch{tick_timer = undefined}) ->

View File

@ -2073,15 +2073,15 @@ deliver_to_client(Msgs, Ack, State) ->
deliver_one_to_client(Msg, Ack, S)
end, State, Msgs).
deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc} = Delivery,
deliver_one_to_client({QNameOrType, QPid, QMsgId, _Redelivered, Mc0} = Delivery,
AckRequired,
#state{cfg = #cfg{msg_interceptor_ctx = MsgIcptCtx}} = State0) ->
SubscriberQoS = case AckRequired of
true -> ?QOS_1;
false -> ?QOS_0
end,
McMqtt0 = mc:convert(mc_mqtt, Mc, mc_env()),
McMqtt = rabbit_msg_interceptor:intercept_outgoing(McMqtt0, MsgIcptCtx),
Mc = rabbit_msg_interceptor:intercept_outgoing(Mc0, MsgIcptCtx),
McMqtt = mc:convert(mc_mqtt, Mc, mc_env()),
MqttMsg = #mqtt_msg{qos = PublisherQos} = mc:protocol_state(McMqtt),
QoS = effective_qos(PublisherQos, SubscriberQoS),
{SettleOp, State1} = maybe_publish_to_client(MqttMsg, Delivery, QoS, State0),