Support AMQP tracing

All incoming and outgoing AMQP frames excluding payload will be logged.

In addition, HTTP over AMQP management requests and response payloads
will be logged.

Each trace includes the module name, function name and module line which
outputs the trace.

Example log output:
```
[debug] <0.823.0> rabbit_amqp_reader:parse_frame_body/2 372
[debug] <0.823.0> channel 1 ->
[debug] <0.823.0>  {'v1_0.transfer',[{handle,{uint,0}},
[debug] <0.823.0>                    {delivery_id,{uint,4294967295}},
[debug] <0.823.0>                    {delivery_tag,{binary,<<>>}},
[debug] <0.823.0>                    {message_format,{uint,0}},
[debug] <0.823.0>                    {settled,true},
[debug] <0.823.0>                    {more,false},
[debug] <0.823.0>                    {rcv_settle_mode,undefined},
[debug] <0.823.0>                    {state,undefined},
[debug] <0.823.0>                    {resume,undefined},
[debug] <0.823.0>                    {aborted,undefined},
[debug] <0.823.0>                    {batchable,undefined}]}
[debug] <0.823.0>  followed by 100 bytes of payload
[debug] <0.823.0>
[debug] <0.828.0> rabbit_amqp_management:handle_request/5 69
[debug] <0.828.0> HTTP over AMQP request:
[debug] <0.828.0>  [{'v1_0.properties',
[debug] <0.828.0>       [{message_id,{binary,<<74,195,231,105,234,167,156,6>>}},
[debug] <0.828.0>        {user_id,undefined},
[debug] <0.828.0>        {to,{utf8,<<"/queues/q1">>}},
[debug] <0.828.0>        {subject,{utf8,<<"PUT">>}},
[debug] <0.828.0>        {reply_to,{utf8,<<"$me">>}},
[debug] <0.828.0>        {correlation_id,undefined},
[debug] <0.828.0>        {content_type,undefined},
[debug] <0.828.0>        {content_encoding,undefined},
[debug] <0.828.0>        {absolute_expiry_time,undefined},
[debug] <0.828.0>        {creation_time,undefined},
[debug] <0.828.0>        {group_id,undefined},
[debug] <0.828.0>        {group_sequence,undefined},
[debug] <0.828.0>        {reply_to_group_id,undefined}]},
[debug] <0.828.0>   {'v1_0.amqp_value',
[debug] <0.828.0>       [{content,
[debug] <0.828.0>            {map,
[debug] <0.828.0>                [{{utf8,<<"durable">>},true},
[debug] <0.828.0>                 {{utf8,<<"arguments">>},
[debug] <0.828.0>                  {map,
[debug] <0.828.0>                      [{{utf8,<<"x-queue-type">>},{utf8,<<"quorum">>}}]}}]}}]}]
[debug] <0.828.0> HTTP over AMQP response:
[debug] <0.828.0>  [{'v1_0.properties',
[debug] <0.828.0>       [{message_id,undefined},
[debug] <0.828.0>        {user_id,undefined},
[debug] <0.828.0>        {to,undefined},
[debug] <0.828.0>        {subject,{utf8,<<"201">>}},
[debug] <0.828.0>        {reply_to,undefined},
[debug] <0.828.0>        {correlation_id,{binary,<<74,195,231,105,234,167,156,6>>}},
[debug] <0.828.0>        {content_type,undefined},
[debug] <0.828.0>        {content_encoding,undefined},
[debug] <0.828.0>        {absolute_expiry_time,undefined},
[debug] <0.828.0>        {creation_time,undefined},
[debug] <0.828.0>        {group_id,undefined},
[debug] <0.828.0>        {group_sequence,undefined},
[debug] <0.828.0>        {reply_to_group_id,undefined}]},
[debug] <0.828.0>   {'v1_0.application_properties',
[debug] <0.828.0>       [{content,[{{utf8,<<"http:response">>},{utf8,<<"1.1">>}}]}]},
[debug] <0.828.0>   {'v1_0.amqp_value',
[debug] <0.828.0>       [{content,
[debug] <0.828.0>            {map,
[debug] <0.828.0>                [{{utf8,<<"leader">>},{utf8,<<"rabbit@VQD7JFK37T">>}},
[debug] <0.828.0>                 {{utf8,<<"replicas">>},
[debug] <0.828.0>                  {array,utf8,[{utf8,<<"rabbit@VQD7JFK37T">>}]}},
[debug] <0.828.0>                 {{utf8,<<"message_count">>},{ulong,0}},
[debug] <0.828.0>                 {{utf8,<<"consumer_count">>},{uint,0}},
[debug] <0.828.0>                 {{utf8,<<"name">>},{utf8,<<"q1">>}},
[debug] <0.828.0>                 {{utf8,<<"vhost">>},{utf8,<<"/">>}},
[debug] <0.828.0>                 {{utf8,<<"durable">>},{boolean,true}},
[debug] <0.828.0>                 {{utf8,<<"auto_delete">>},{boolean,false}},
[debug] <0.828.0>                 {{utf8,<<"exclusive">>},{boolean,false}},
[debug] <0.828.0>                 {{utf8,<<"type">>},{utf8,<<"quorum">>}},
[debug] <0.828.0>                 {{utf8,<<"arguments">>},
[debug] <0.828.0>                  {map,
[debug] <0.828.0>                      [{{utf8,<<"x-queue-type">>},{utf8,<<"quorum">>}}]}}]}}]}]
[debug] <0.828.0>
[debug] <0.826.0> rabbit_amqp_writer:assemble_frame/4 215
[debug] <0.826.0> channel 1 <-
[debug] <0.826.0>  {'v1_0.transfer',[{handle,{uint,1}},
[debug] <0.826.0>                    {delivery_id,{uint,0}},
[debug] <0.826.0>                    {delivery_tag,{binary,<<>>}},
[debug] <0.826.0>                    {message_format,{uint,0}},
[debug] <0.826.0>                    {settled,true},
[debug] <0.826.0>                    {more,undefined},
[debug] <0.826.0>                    {rcv_settle_mode,undefined},
[debug] <0.826.0>                    {state,undefined},
[debug] <0.826.0>                    {resume,undefined},
[debug] <0.826.0>                    {aborted,undefined},
[debug] <0.826.0>                    {batchable,undefined}]}
[debug] <0.826.0>  followed by 268 bytes of payload
[debug] <0.826.0>
```
This commit is contained in:
David Ansari 2024-06-05 12:13:58 +02:00
parent 1385f6ff4d
commit 5f659b5eb9
6 changed files with 22 additions and 35 deletions

View File

@ -240,10 +240,6 @@ ifdef CREDIT_FLOW_TRACING
RMQ_ERLC_OPTS += -DCREDIT_FLOW_TRACING=true
endif
ifdef DEBUG_FF
RMQ_ERLC_OPTS += -DDEBUG_QUORUM_QUEUE_FF=true
endif
ifdef TRACE_SUPERVISOR2
RMQ_ERLC_OPTS += -DTRACE_SUPERVISOR2=true
endif

View File

@ -1,25 +1,15 @@
%%-define(debug, true).
-ifdef(debug).
-define(DEBUG0(F), ?SAFE(rabbit_log:debug(F, []))).
-define(DEBUG(F, A), ?SAFE(rabbit_log:debug(F, A))).
%% To enable AMQP trace logging, uncomment below line:
%-define(TRACE_AMQP, true).
-ifdef(TRACE_AMQP).
-warning("AMQP tracing is enabled").
-define(TRACE(Format, Args),
rabbit_log:debug(
"~s:~s/~b ~b~n" ++ Format ++ "~n",
[?MODULE, ?FUNCTION_NAME, ?FUNCTION_ARITY, ?LINE] ++ Args)).
-else.
-define(DEBUG0(F), ok).
-define(DEBUG(F, A), ok).
-define(TRACE(_Format, _Args), ok).
-endif.
-define(pprint(F), rabbit_log:debug("~p~n",
[amqp10_framing:pprint(F)])).
-define(SAFE(F),
((fun() ->
try F
catch __T:__E:__ST ->
rabbit_log:debug("~p:~p thrown debugging~n~p~n",
[__T, __E, __ST])
end
end)())).
%% General consts
%% [2.8.19]

View File

@ -22,8 +22,6 @@
{iolist(), permission_caches()}.
handle_request(Request, Vhost, User, ConnectionPid, PermCaches0) ->
ReqSections = amqp10_framing:decode_bin(Request),
?DEBUG("~s Inbound request:~n ~tp",
[?MODULE, [amqp10_framing:pprint(Section) || Section <- ReqSections]]),
{#'v1_0.properties'{
message_id = MessageId,
@ -65,6 +63,9 @@ handle_request(Request, Vhost, User, ConnectionPid, PermCaches0) ->
]},
RespDataSect = #'v1_0.amqp_value'{content = RespBody},
RespSections = [RespProps, RespAppProps, RespDataSect],
?TRACE("HTTP over AMQP request:~n ~tp~nHTTP over AMQP response:~n ~tp",
[[amqp10_framing:pprint(Sect) || Sect <- ReqSections],
[amqp10_framing:pprint(Sect) || Sect <- RespSections]]),
IoList = [amqp10_framing:encode_bin(Sect) || Sect <- RespSections],
{IoList, PermCaches}.

View File

@ -369,8 +369,12 @@ parse_frame_body(Body, _Channel) ->
Performative = amqp10_framing:decode(DescribedPerformative),
if BytesParsed < BytesBody ->
Payload = binary_part(Body, BytesParsed, BytesBody - BytesParsed),
?TRACE("channel ~b ->~n ~tp~n followed by ~tb bytes of payload",
[_Channel, amqp10_framing:pprint(Performative), iolist_size(Payload)]),
{Performative, Payload};
BytesParsed =:= BytesBody ->
?TRACE("channel ~b ->~n ~tp",
[_Channel, amqp10_framing:pprint(Performative)]),
Performative
end.

View File

@ -1773,9 +1773,6 @@ incoming_mgmt_link_transfer(
delivery_tag = {binary, <<>>},
message_format = ?UINT(?MESSAGE_FORMAT),
settled = true},
?DEBUG("~s Outbound payload:~n ~tp~n",
[?MODULE, [amqp10_framing:pprint(Section) ||
Section <- amqp10_framing:decode_bin(iolist_to_binary(Response))]]),
validate_message_size(Response, OutgoingMaxMessageSize),
Frames = transfer_frames(Transfer, Response, MaxFrameSize),
PendingDelivery = #pending_management_delivery{frames = Frames},

View File

@ -173,21 +173,20 @@ assemble_frame(Channel, Performative) ->
assemble_frame(Channel, Performative, amqp10_framing).
assemble_frame(Channel, Performative, amqp10_framing) ->
?DEBUG("~s Channel ~tp <-~n~tp~n",
[?MODULE, Channel, amqp10_framing:pprint(Performative)]),
?TRACE("channel ~b <-~n ~tp",
[Channel, amqp10_framing:pprint(Performative)]),
PerfBin = amqp10_framing:encode_bin(Performative),
amqp10_binary_generator:build_frame(Channel, PerfBin);
assemble_frame(Channel, Performative, rabbit_amqp_sasl) ->
?DEBUG("~s Channel ~tp <-~n~tp~n",
[?MODULE, Channel, amqp10_framing:pprint(Performative)]),
?TRACE("channel ~b <-~n ~tp",
[Channel, amqp10_framing:pprint(Performative)]),
PerfBin = amqp10_framing:encode_bin(Performative),
amqp10_binary_generator:build_frame(Channel, ?AMQP_SASL_FRAME_TYPE, PerfBin).
%%TODO respect MaxFrame
assemble_frame(Channel, Performative, Payload, _MaxFrame) ->
?DEBUG("~s Channel ~tp <-~n~tp~n followed by ~tp bytes of payload~n",
[?MODULE, Channel, amqp10_framing:pprint(Performative),
iolist_size(Payload)]),
?TRACE("channel ~b <-~n ~tp~n followed by ~tb bytes of payload",
[Channel, amqp10_framing:pprint(Performative), iolist_size(Payload)]),
PerfIoData = amqp10_framing:encode_bin(Performative),
amqp10_binary_generator:build_frame(Channel, [PerfIoData, Payload]).