Local shovels

This commit is contained in:
Diana Parra Corbacho 2025-06-23 16:16:25 +02:00
parent c8840785d9
commit 444b5644b2
11 changed files with 2497 additions and 73 deletions

View File

@ -7,8 +7,6 @@
-module(rabbit_amqp091_shovel).
-define(APP, rabbitmq_shovel).
-behaviour(rabbit_shovel_behaviour).
-include_lib("amqp_client/include/amqp_client.hrl").
@ -58,7 +56,7 @@ parse(_Name, {source, Source}) ->
CArgs = proplists:get_value(consumer_args, Source, []),
#{module => ?MODULE,
uris => proplists:get_value(uris, Source),
resource_decl => decl_fun({source, Source}),
resource_decl => rabbit_shovel_util:decl_fun(?MODULE, {source, Source}),
queue => Queue,
delete_after => proplists:get_value(delete_after, Source, never),
prefetch_count => Prefetch,
@ -74,7 +72,7 @@ parse(Name, {destination, Dest}) ->
PropsFun2 = add_timestamp_header_fun(ATH, PropsFun1),
#{module => ?MODULE,
uris => proplists:get_value(uris, Dest),
resource_decl => decl_fun({destination, Dest}),
resource_decl => rabbit_shovel_util:decl_fun(?MODULE, {destination, Dest}),
props_fun => PropsFun2,
fields_fun => PubFieldsFun,
add_forward_headers => AFH,
@ -544,47 +542,6 @@ props_fun_timestamp_header({M, F, Args}, SrcUri, DestUri, Props) ->
rabbit_shovel_util:add_timestamp_header(
apply(M, F, Args ++ [SrcUri, DestUri, Props])).
parse_declaration({[], Acc}) ->
Acc;
parse_declaration({[{Method, Props} | Rest], Acc}) when is_list(Props) ->
FieldNames = try rabbit_framing_amqp_0_9_1:method_fieldnames(Method)
catch exit:Reason -> fail(Reason)
end,
case proplists:get_keys(Props) -- FieldNames of
[] -> ok;
UnknownFields -> fail({unknown_fields, Method, UnknownFields})
end,
{Res, _Idx} = lists:foldl(
fun (K, {R, Idx}) ->
NewR = case proplists:get_value(K, Props) of
undefined -> R;
V -> setelement(Idx, R, V)
end,
{NewR, Idx + 1}
end, {rabbit_framing_amqp_0_9_1:method_record(Method), 2},
FieldNames),
parse_declaration({Rest, [Res | Acc]});
parse_declaration({[{Method, Props} | _Rest], _Acc}) ->
fail({expected_method_field_list, Method, Props});
parse_declaration({[Method | Rest], Acc}) ->
parse_declaration({[{Method, []} | Rest], Acc}).
decl_fun({source, Endpoint}) ->
case parse_declaration({proplists:get_value(declarations, Endpoint, []), []}) of
[] ->
case proplists:get_value(predeclared, application:get_env(?APP, topology, []), false) of
true -> case proplists:get_value(queue, Endpoint) of
<<>> -> fail({invalid_parameter_value, declarations, {require_non_empty}});
Queue -> {?MODULE, check_fun, [Queue]}
end;
false -> {?MODULE, decl_fun, []}
end;
Decl -> {?MODULE, decl_fun, [Decl]}
end;
decl_fun({destination, Endpoint}) ->
Decl = parse_declaration({proplists:get_value(declarations, Endpoint, []), []}),
{?MODULE, decl_fun, [Decl]}.
decl_fun(Decl, _Conn, Ch) ->
[begin
amqp_channel:call(Ch, M)

View File

@ -122,7 +122,7 @@ connect(Name, SndSettleMode, Uri, Postfix, Addr, Map, AttachFun) ->
{ok, Sess} = amqp10_client:begin_session(Conn),
link(Conn),
LinkName = begin
LinkName0 = gen_unique_name(Name, Postfix),
LinkName0 = rabbit_shovel_util:gen_unique_name(Name, Postfix),
rabbit_data_coercion:to_binary(LinkName0)
end,
% needs to be sync, i.e. awaits the 'attach' event as
@ -373,14 +373,3 @@ add_timestamp_header(_, Msg) -> Msg.
add_forward_headers(#{dest := #{cached_forward_headers := Anns}}, Msg) ->
amqp10_msg:set_message_annotations(Anns, Msg);
add_forward_headers(_, Msg) -> Msg.
gen_unique_name(Pre0, Post0) ->
Pre = to_binary(Pre0),
Post = to_binary(Post0),
Id = bin_to_hex(crypto:strong_rand_bytes(8)),
<<Pre/binary, <<"_">>/binary, Id/binary, <<"_">>/binary, Post/binary>>.
bin_to_hex(Bin) ->
<<<<if N >= 10 -> N -10 + $a;
true -> N + $0 end>>
|| <<N:4>> <= Bin>>.

View File

@ -0,0 +1,637 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(rabbit_local_shovel).
-behaviour(rabbit_shovel_behaviour).
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").
-include_lib("rabbit/include/mc.hrl").
-include("rabbit_shovel.hrl").
-export([
parse/2,
connect_source/1,
connect_dest/1,
init_source/1,
init_dest/1,
source_uri/1,
dest_uri/1,
source_protocol/1,
dest_protocol/1,
source_endpoint/1,
dest_endpoint/1,
close_dest/1,
close_source/1,
handle_source/2,
handle_dest/2,
ack/3,
nack/3,
forward/3,
status/1
]).
-export([
src_decl_exchange/4,
decl_queue/4,
dest_decl_queue/4,
check_queue/4,
dest_check_queue/4,
decl_fun/3,
check_fun/3
]).
-define(QUEUE, lqueue).
-record(pending_ack, {
delivery_tag,
msg_id
}).
parse(_Name, {source, Source}) ->
Prefetch = parse_parameter(prefetch_count, fun parse_non_negative_integer/1,
proplists:get_value(prefetch_count, Source,
?DEFAULT_PREFETCH)),
Queue = parse_parameter(queue, fun parse_binary/1,
proplists:get_value(queue, Source)),
CArgs = proplists:get_value(consumer_args, Source, []),
#{module => ?MODULE,
uris => proplists:get_value(uris, Source),
resource_decl => rabbit_shovel_util:decl_fun(?MODULE, {source, Source}),
queue => Queue,
delete_after => proplists:get_value(delete_after, Source, never),
prefetch_count => Prefetch,
consumer_args => CArgs};
parse(_Name, {destination, Dest}) ->
Exchange = parse_parameter(dest_exchange, fun parse_binary/1,
proplists:get_value(dest_exchange, Dest, none)),
RK = parse_parameter(dest_exchange_key, fun parse_binary/1,
proplists:get_value(dest_routing_key, Dest, none)),
#{module => ?MODULE,
uris => proplists:get_value(uris, Dest),
resource_decl => rabbit_shovel_util:decl_fun(?MODULE, {destination, Dest}),
exchange => Exchange,
routing_key => RK,
add_forward_headers => proplists:get_value(add_forward_headers, Dest, false),
add_timestamp_header => proplists:get_value(add_timestamp_header, Dest, false)}.
connect_source(State = #{source := Src = #{resource_decl := {M, F, MFArgs},
queue := QName0,
uris := [Uri | _]}}) ->
QState = rabbit_queue_type:init(),
{User, VHost} = get_user_vhost_from_amqp_param(Uri),
%% We handle the most recently declared queue to use anonymous functions
%% It's usually the channel that does it
MRDQ = apply(M, F, MFArgs ++ [VHost, User]),
QName = case QName0 of
<<>> -> MRDQ;
_ -> QName0
end,
State#{source => Src#{current => #{queue_states => QState,
next_tag => 1,
user => User,
vhost => VHost},
queue => QName},
unacked_message_q => ?QUEUE:new()}.
connect_dest(State = #{dest := Dest = #{resource_decl := {M, F, MFArgs},
uris := [Uri | _]
},
ack_mode := AckMode}) ->
%% Shall we get the user from an URI or something else?
{User, VHost} = get_user_vhost_from_amqp_param(Uri),
apply(M, F, MFArgs ++ [VHost, User]),
QState = rabbit_queue_type:init(),
case AckMode of
on_confirm ->
State#{dest => Dest#{current => #{queue_states => QState,
delivery_id => 1,
vhost => VHost},
unacked => #{}}};
_ ->
State#{dest => Dest#{current => #{queue_states => QState,
vhost => VHost},
unacked => #{}}}
end.
init_source(State = #{source := #{queue := QName0,
prefetch_count := Prefetch,
consumer_args := Args,
current := #{queue_states := QState0,
vhost := VHost} = Current} = Src,
name := Name,
ack_mode := AckMode}) ->
_Mode = case rabbit_feature_flags:is_enabled('rabbitmq_4.0.0') of
true ->
{credited, Prefetch};
false ->
{credited, credit_api_v1}
end,
QName = rabbit_misc:r(VHost, queue, QName0),
CTag = consumer_tag(Name),
case rabbit_amqqueue:with(
QName,
fun(Q) ->
SndSettled = case AckMode of
no_ack -> true;
on_publish -> false;
on_confirm -> false
end,
Spec = #{no_ack => SndSettled,
channel_pid => self(),
limiter_pid => none,
limiter_active => false,
mode => {simple_prefetch, Prefetch},
consumer_tag => CTag,
exclusive_consume => false,
args => Args,
ok_msg => undefined,
acting_user => ?SHOVEL_USER},
case remaining(Q, State) of
0 ->
{0, {error, autodelete}};
Remaining ->
{Remaining, rabbit_queue_type:consume(Q, Spec, QState0)}
end
end) of
{Remaining, {ok, QState}} ->
State#{source => Src#{current => Current#{queue_states => QState,
consumer_tag => CTag},
remaining => Remaining,
remaining_unacked => Remaining}};
{0, {error, autodelete}} ->
exit({shutdown, autodelete});
{_Remaining, {error, Reason}} ->
rabbit_log:error(
"Shovel '~ts' in vhost '~ts' failed to consume: ~ts",
[Name, VHost, Reason]),
exit({shutdown, failed_to_consume_from_source});
{unlimited, {error, not_implemented, Reason, ReasonArgs}} ->
rabbit_log:error(
"Shovel '~ts' in vhost '~ts' failed to consume: ~ts",
[Name, VHost, io_lib:format(Reason, ReasonArgs)]),
exit({shutdown, failed_to_consume_from_source});
{error, not_found} ->
exit({shutdown, missing_source_queue})
end.
init_dest(#{name := Name,
shovel_type := Type,
dest := #{add_forward_headers := AFH} = Dst} = State) ->
case AFH of
true ->
Props = #{<<"x-opt-shovelled-by">> => rabbit_nodes:cluster_name(),
<<"x-opt-shovel-type">> => rabbit_data_coercion:to_binary(Type),
<<"x-opt-shovel-name">> => rabbit_data_coercion:to_binary(Name)},
State#{dest => Dst#{cached_forward_headers => Props}};
false ->
State
end.
source_uri(_State) ->
"".
dest_uri(_State) ->
"".
source_protocol(_State) ->
local.
dest_protocol(_State) ->
local.
source_endpoint(#{source := #{queue := Queue,
exchange := SrcX,
routing_key := SrcXKey}}) ->
[{src_exchange, SrcX},
{src_exchange_key, SrcXKey},
{src_queue, Queue}];
source_endpoint(#{source := #{queue := Queue}}) ->
[{src_queue, Queue}];
source_endpoint(_Config) ->
[].
dest_endpoint(#{dest := #{exchange := SrcX,
routing_key := SrcXKey}}) ->
[{dest_exchange, SrcX},
{dest_exchange_key, SrcXKey}];
dest_endpoint(#{dest := #{queue := Queue}}) ->
[{dest_queue, Queue}];
dest_endpoint(_Config) ->
[].
close_dest(_State) ->
ok.
close_source(#{source := #{current := #{queue_states := QStates0,
consumer_tag := CTag,
user := User,
vhost := VHost},
queue := QName0}}) ->
QName = rabbit_misc:r(VHost, queue, QName0),
case rabbit_amqqueue:with(
QName,
fun(Q) ->
rabbit_queue_type:cancel(Q, #{consumer_tag => CTag,
reason => remove,
user => User#user.username}, QStates0)
end) of
{ok, _QStates} ->
ok;
{error, not_found} ->
ok;
{error, Reason} ->
rabbit_log:warning("Local shovel failed to remove consumer ~tp: ~tp",
[CTag, Reason]),
ok
end;
close_source(_) ->
%% No consumer tag, no consumer to cancel
ok.
handle_source(#'basic.ack'{delivery_tag = Seq, multiple = Multiple},
State = #{ack_mode := on_confirm}) ->
confirm_to_inbound(fun(Tag, Multi, StateX) ->
rabbit_shovel_behaviour:ack(Tag, Multi, StateX)
end, Seq, Multiple, State);
handle_source({queue_event, _, {Type, _, _}}, _State) when Type =:= confirm;
Type =:= reject_publish ->
not_handled;
handle_source({queue_event, QRef, Evt}, #{source := Source = #{current := Current = #{queue_states := QueueStates0}}} = State0) ->
case rabbit_queue_type:handle_event(QRef, Evt, QueueStates0) of
{ok, QState1, Actions} ->
State = State0#{source => Source#{current => Current#{queue_states => QState1}}},
handle_queue_actions(Actions, State);
{eol, Actions} ->
_ = handle_queue_actions(Actions, State0),
{stop, {inbound_link_or_channel_closure, queue_deleted}};
{protocol_error, _Type, Reason, ReasonArgs} ->
{stop, list_to_binary(io_lib:format(Reason, ReasonArgs))}
end;
handle_source({{'DOWN', #resource{name = Queue,
kind = queue,
virtual_host = VHost}}, _, _, _, _} ,
#{source := #{queue := Queue, current := #{vhost := VHost}}}) ->
{stop, {inbound_link_or_channel_closure, source_queue_down}};
handle_source(_Msg, _State) ->
not_handled.
handle_dest({queue_event, _QRef, {confirm, MsgSeqNos, _QPid}},
#{ack_mode := on_confirm} = State) ->
confirm_to_inbound(fun(Tag, Multi, StateX) ->
rabbit_shovel_behaviour:ack(Tag, Multi, StateX)
end, MsgSeqNos, false, State);
handle_dest({queue_event, _QRef, {reject_publish, Seq, _QPid}},
#{ack_mode := on_confirm} = State) ->
confirm_to_inbound(fun(Tag, Multi, StateX) ->
rabbit_shovel_behaviour:nack(Tag, Multi, StateX)
end, Seq, false, State);
handle_dest({{'DOWN', #resource{name = Queue,
kind = queue,
virtual_host = VHost}}, _, _, _, _} ,
#{dest := #{queue := Queue, current := #{vhost := VHost}}}) ->
{stop, {outbound_link_or_channel_closure, dest_queue_down}};
handle_dest(_Msg, State) ->
State.
ack(DeliveryTag, Multiple, State) ->
settle(complete, DeliveryTag, Multiple, State).
nack(DeliveryTag, Multiple, State) ->
settle(discard, DeliveryTag, Multiple, State).
forward(Tag, Msg0, #{dest := #{current := #{queue_states := QState} = Current,
unacked := Unacked} = Dest,
ack_mode := AckMode} = State0) ->
{Options, #{dest := #{current := Current1} = Dest1} = State} =
case AckMode of
on_confirm ->
DeliveryId = maps:get(delivery_id, Current),
Opts = #{correlation => DeliveryId},
{Opts, State0#{dest => Dest#{current => Current#{delivery_id => DeliveryId + 1}}}};
_ ->
{#{}, State0}
end,
Msg = set_annotations(Msg0, Dest),
QNames = route(Msg, Dest),
Queues = rabbit_amqqueue:lookup_many(QNames),
case rabbit_queue_type:deliver(Queues, Msg, Options, QState) of
{ok, QState1, Actions} ->
%% TODO handle credit?
State1 = State#{dest => Dest1#{current => Current1#{queue_states => QState1}}},
#{dest := Dst1} = State2 = rabbit_shovel_behaviour:incr_forwarded(State1),
State4 = rabbit_shovel_behaviour:decr_remaining_unacked(
case AckMode of
no_ack ->
rabbit_shovel_behaviour:decr_remaining(1, State2);
on_confirm ->
Correlation = maps:get(correlation, Options),
State2#{dest => Dst1#{unacked => Unacked#{Correlation => Tag}}};
on_publish ->
State3 = rabbit_shovel_behaviour:ack(Tag, false, State2),
rabbit_shovel_behaviour:decr_remaining(1, State3)
end),
handle_queue_actions(Actions, State4);
{error, Reason} ->
exit({shutdown, Reason})
end.
set_annotations(Msg, Dest) ->
add_routing(add_forward_headers(add_timestamp_header(Msg, Dest), Dest), Dest).
add_timestamp_header(Msg, #{add_timestamp_header := true}) ->
mc:set_annotation(<<"x-opt-shovelled-timestamp">>, os:system_time(milli_seconds), Msg);
add_timestamp_header(Msg, _) ->
Msg.
add_forward_headers(Msg, #{cached_forward_headers := Props}) ->
maps:fold(fun(K, V, Acc) ->
mc:set_annotation(K, V, Acc)
end, Msg, Props);
add_forward_headers(Msg, _D) ->
Msg.
add_routing(Msg0, Dest) ->
Msg = case maps:get(exchange, Dest, undefined) of
undefined -> Msg0;
Exchange -> mc:set_annotation(?ANN_EXCHANGE, Exchange, Msg0)
end,
case maps:get(routing_key, Dest, undefined) of
undefined -> Msg;
RK -> mc:set_annotation(?ANN_ROUTING_KEYS, [RK], Msg)
end.
status(_) ->
running.
%% Internal
parse_parameter(_, _, none) ->
none;
parse_parameter(Param, Fun, Value) ->
try
Fun(Value)
catch
_:{error, Err} ->
fail({invalid_parameter_value, Param, Err})
end.
parse_non_negative_integer(N) when is_integer(N) andalso N >= 0 ->
N;
parse_non_negative_integer(N) ->
fail({require_non_negative_integer, N}).
parse_binary(Binary) when is_binary(Binary) ->
Binary;
parse_binary(NotABinary) ->
fail({require_binary, NotABinary}).
consumer_tag(Name) ->
CTag0 = rabbit_shovel_util:gen_unique_name(Name, "receiver"),
rabbit_data_coercion:to_binary(CTag0).
-spec fail(term()) -> no_return().
fail(Reason) -> throw({error, Reason}).
handle_queue_actions(Actions, State) ->
lists:foldl(
fun({deliver, _CTag, AckRequired, Msgs}, S0) ->
handle_deliver(AckRequired, Msgs, S0);
(_, _) ->
not_handled
%% ({queue_down, QRef}, S0) ->
%% State;
%% ({block, QName}, S0) ->
%% State;
%% ({unblock, QName}, S0) ->
%% State
end, State, Actions).
handle_deliver(AckRequired, Msgs, State) when is_list(Msgs) ->
lists:foldl(fun({_QName, _QPid, MsgId, _Redelivered, Mc}, S0) ->
DeliveryTag = next_tag(S0),
S = record_pending(AckRequired, DeliveryTag, MsgId, increase_next_tag(S0)),
rabbit_shovel_behaviour:forward(DeliveryTag, Mc, S)
end, State, Msgs).
next_tag(#{source := #{current := #{next_tag := DeliveryTag}}}) ->
DeliveryTag.
increase_next_tag(#{source := Source = #{current := Current = #{next_tag := DeliveryTag}}} = State) ->
State#{source => Source#{current => Current#{next_tag => DeliveryTag + 1}}}.
record_pending(false, _DeliveryTag, _MsgId, State) ->
State;
record_pending(true, DeliveryTag, MsgId, #{unacked_message_q := UAMQ0} = State) ->
UAMQ = ?QUEUE:in(#pending_ack{delivery_tag = DeliveryTag,
msg_id = MsgId}, UAMQ0),
State#{unacked_message_q => UAMQ}.
remaining(_Q, #{source := #{delete_after := never}}) ->
unlimited;
remaining(Q, #{source := #{delete_after := 'queue-length'}}) ->
[{messages, Count}] = rabbit_amqqueue:info(Q, [messages]),
Count;
remaining(_Q, #{source := #{delete_after := Count}}) ->
Count.
decl_fun(Decl, VHost, User) ->
lists:foldr(
fun(Method, MRDQ) -> %% keep track of most recently declared queue
Reply = rabbit_channel:handle_method(
expand_shortcuts(Method, MRDQ),
none, #{}, none, VHost, User),
case {Method, Reply} of
{#'queue.declare'{}, {ok, QName, _, _}} ->
QName#resource.name;
_ ->
MRDQ
end
end, <<>>, Decl).
expand_shortcuts(#'queue.bind' {queue = Q, routing_key = K} = M, MRDQ) ->
M#'queue.bind' {queue = expand_queue_name_shortcut(Q, MRDQ),
routing_key = expand_routing_key_shortcut(Q, K, MRDQ)};
expand_shortcuts(#'queue.unbind' {queue = Q, routing_key = K} = M, MRDQ) ->
M#'queue.unbind' {queue = expand_queue_name_shortcut(Q, MRDQ),
routing_key = expand_routing_key_shortcut(Q, K, MRDQ)};
expand_shortcuts(M, _State) ->
M.
expand_queue_name_shortcut(<<>>, <<>>) ->
exit({shutdown, {not_found, "no previously declared queue"}});
expand_queue_name_shortcut(<<>>, MRDQ) ->
MRDQ;
expand_queue_name_shortcut(QueueNameBin, _) ->
QueueNameBin.
expand_routing_key_shortcut(<<>>, <<>>, <<>>) ->
exit({shutdown, {not_found, "no previously declared queue"}});
expand_routing_key_shortcut(<<>>, <<>>, MRDQ) ->
MRDQ;
expand_routing_key_shortcut(_QueueNameBin, RoutingKey, _) ->
RoutingKey.
%% TODO A missing queue stops the shovel but because the error reason
%% the failed status is not stored. Would not be it more useful to
%% report it??? This is a rabbit_shovel_worker issues, last terminate
%% clause
check_fun(QName, VHost, User) ->
Method = #'queue.declare'{queue = QName,
passive = true},
decl_fun([Method], VHost, User).
src_decl_exchange(SrcX, SrcXKey, VHost, User) ->
Methods = [#'queue.bind'{routing_key = SrcXKey,
exchange = SrcX},
#'queue.declare'{exclusive = true}],
decl_fun(Methods, VHost, User).
dest_decl_queue(none, _, _, _) ->
ok;
dest_decl_queue(QName, QArgs, VHost, User) ->
decl_queue(QName, QArgs, VHost, User).
decl_queue(QName, QArgs, VHost, User) ->
Args = rabbit_misc:to_amqp_table(QArgs),
Method = #'queue.declare'{queue = QName,
durable = true,
arguments = Args},
decl_fun([Method], VHost, User).
dest_check_queue(none, _, _, _) ->
ok;
dest_check_queue(QName, QArgs, VHost, User) ->
check_queue(QName, QArgs, VHost, User).
check_queue(QName, _QArgs, VHost, User) ->
Method = #'queue.declare'{queue = QName,
passive = true},
decl_fun([Method], VHost, User).
get_user_vhost_from_amqp_param(Uri) ->
{ok, AmqpParam} = amqp_uri:parse(Uri),
rabbit_log:warning("AMQP PARAM ~p", [AmqpParam]),
{Username, Password, VHost} =
case AmqpParam of
#amqp_params_direct{username = U,
password = P,
virtual_host = V} ->
{U, P, V};
#amqp_params_network{username = U,
password = P,
virtual_host = V} ->
{U, P, V}
end,
case rabbit_access_control:check_user_login(Username, [{password, Password}]) of
{ok, User} ->
try
rabbit_access_control:check_vhost_access(User, VHost, undefined, #{}) of
ok ->
{User, VHost}
catch
exit:#amqp_error{name = not_allowed} ->
exit({shutdown, {access_refused, Username}})
end;
{refused, Username, _Msg, _Module} ->
rabbit_log:error("Local shovel user ~ts was refused access"),
exit({shutdown, {access_refused, Username}})
end.
settle(Op, DeliveryTag, Multiple, #{unacked_message_q := UAMQ0,
source := #{queue := Queue,
current := Current = #{queue_states := QState0,
consumer_tag := CTag,
vhost := VHost}} = Src} = State) ->
{Acked, UAMQ} = collect_acks(UAMQ0, DeliveryTag, Multiple),
QRef = rabbit_misc:r(VHost, queue, Queue),
MsgIds = [Ack#pending_ack.msg_id || Ack <- Acked],
case rabbit_queue_type:settle(QRef, Op, CTag, MsgIds, QState0) of
{ok, QState1, Actions} ->
QState = handle_queue_actions(Actions, QState1),
State#{source => Src#{current => Current#{queue_states => QState}},
unacked_message_q => UAMQ};
{'protocol_error', Type, Reason, Args} ->
rabbit_log:error("Shovel failed to settle ~p acknowledgments with ~tp: ~tp",
[Op, Type, io_lib:format(Reason, Args)]),
exit({shutdown, {ack_failed, Reason}})
end.
%% From rabbit_channel
%% Records a client-sent acknowledgement. Handles both single delivery acks
%% and multi-acks.
%%
%% Returns a tuple of acknowledged pending acks and remaining pending acks.
%% Sorts each group in the youngest-first order (descending by delivery tag).
%% The special case for 0 comes from the AMQP 0-9-1 spec: if the multiple field is set to 1 (true),
%% and the delivery tag is 0, this indicates acknowledgement of all outstanding messages (by a client).
collect_acks(UAMQ, 0, true) ->
{lists:reverse(?QUEUE:to_list(UAMQ)), ?QUEUE:new()};
collect_acks(UAMQ, DeliveryTag, Multiple) ->
collect_acks([], [], UAMQ, DeliveryTag, Multiple).
collect_acks(AcknowledgedAcc, RemainingAcc, UAMQ, DeliveryTag, Multiple) ->
case ?QUEUE:out(UAMQ) of
{{value, UnackedMsg = #pending_ack{delivery_tag = CurrentDT}},
UAMQTail} ->
if CurrentDT == DeliveryTag ->
{[UnackedMsg | AcknowledgedAcc],
case RemainingAcc of
[] -> UAMQTail;
_ -> ?QUEUE:join(
?QUEUE:from_list(lists:reverse(RemainingAcc)),
UAMQTail)
end};
Multiple ->
collect_acks([UnackedMsg | AcknowledgedAcc], RemainingAcc,
UAMQTail, DeliveryTag, Multiple);
true ->
collect_acks(AcknowledgedAcc, [UnackedMsg | RemainingAcc],
UAMQTail, DeliveryTag, Multiple)
end;
{empty, UAMQTail} ->
{AcknowledgedAcc, UAMQTail}
end.
route(_Msg, #{queue := Queue,
current := #{vhost := VHost}}) when Queue =/= none ->
QName = rabbit_misc:r(VHost, queue, Queue),
[QName];
route(Msg, #{current := #{vhost := VHost}}) ->
ExchangeName = rabbit_misc:r(VHost, exchange, mc:exchange(Msg)),
Exchange = rabbit_exchange:lookup_or_die(ExchangeName),
rabbit_exchange:route(Exchange, Msg, #{return_binding_keys => true}).
remove_delivery_tags(Seq, false, Unacked, 0) ->
{maps:remove(Seq, Unacked), 1};
remove_delivery_tags(Seq, true, Unacked, Count) ->
case maps:size(Unacked) of
0 -> {Unacked, Count};
_ ->
maps:fold(fun(K, _V, {Acc, Cnt}) when K =< Seq ->
{maps:remove(K, Acc), Cnt + 1};
(_K, _V, Acc) -> Acc
end, {Unacked, 0}, Unacked)
end.
confirm_to_inbound(ConfirmFun, SeqNos, Multiple, State)
when is_list(SeqNos) ->
lists:foldl(fun(Seq, State0) ->
confirm_to_inbound(ConfirmFun, Seq, Multiple, State0)
end, State, SeqNos);
confirm_to_inbound(ConfirmFun, Seq, Multiple,
State0 = #{dest := #{unacked := Unacked} = Dst}) ->
#{Seq := InTag} = Unacked,
State = ConfirmFun(InTag, Multiple, State0),
{Unacked1, Removed} = remove_delivery_tags(Seq, Multiple, Unacked, 0),
rabbit_shovel_behaviour:decr_remaining(Removed,
State#{dest =>
Dst#{unacked => Unacked1}}).

View File

@ -13,7 +13,8 @@
-include("rabbit_shovel.hrl").
resolve_module(amqp091) -> rabbit_amqp091_shovel;
resolve_module(amqp10) -> rabbit_amqp10_shovel.
resolve_module(amqp10) -> rabbit_amqp10_shovel;
resolve_module(local) -> rabbit_local_shovel.
is_legacy(Config) ->
not proplists:is_defined(source, Config).

View File

@ -86,13 +86,15 @@ internal_owner(Def) ->
validate_src(Def) ->
case protocols(Def) of
{amqp091, _} -> validate_amqp091_src(Def);
{amqp10, _} -> []
{amqp10, _} -> [];
{local, _} -> validate_local_src(Def)
end.
validate_dest(Def) ->
case protocols(Def) of
{_, amqp091} -> validate_amqp091_dest(Def);
{_, amqp10} -> []
{_, amqp10} -> [];
{_, local} -> validate_local_dest(Def)
end.
validate_amqp091_src(Def) ->
@ -108,6 +110,19 @@ validate_amqp091_src(Def) ->
ok
end].
validate_local_src(Def) ->
[case pget2(<<"src-exchange">>, <<"src-queue">>, Def) of
zero -> {error, "Must specify 'src-exchange' or 'src-queue'", []};
one -> ok;
both -> {error, "Cannot specify 'src-exchange' and 'src-queue'", []}
end,
case {pget(<<"src-delete-after">>, Def, pget(<<"delete-after">>, Def)), pget(<<"ack-mode">>, Def)} of
{N, <<"no-ack">>} when is_integer(N) ->
{error, "Cannot specify 'no-ack' and numerical 'delete-after'", []};
_ ->
ok
end].
obfuscate_uris_in_definition(Def) ->
SrcURIs = get_uris(<<"src-uri">>, Def),
ObfuscatedSrcURIsDef = pset(<<"src-uri">>, obfuscate_uris(SrcURIs), Def),
@ -125,6 +140,13 @@ validate_amqp091_dest(Def) ->
both -> {error, "Cannot specify 'dest-exchange' and 'dest-queue'", []}
end].
validate_local_dest(Def) ->
[case pget2(<<"dest-exchange">>, <<"dest-queue">>, Def) of
zero -> ok;
one -> ok;
both -> {error, "Cannot specify 'dest-exchange' and 'dest-queue'", []}
end].
shovel_validation() ->
[{<<"internal">>, fun rabbit_parameter_validation:boolean/2, optional},
{<<"internal_owner">>, fun validate_internal_owner/2, optional},
@ -132,17 +154,30 @@ shovel_validation() ->
{<<"ack-mode">>, rabbit_parameter_validation:enum(
['no-ack', 'on-publish', 'on-confirm']), optional},
{<<"src-protocol">>,
rabbit_parameter_validation:enum(['amqp10', 'amqp091']), optional},
rabbit_parameter_validation:enum(['amqp10', 'amqp091', 'local']), optional},
{<<"dest-protocol">>,
rabbit_parameter_validation:enum(['amqp10', 'amqp091']), optional}
rabbit_parameter_validation:enum(['amqp10', 'amqp091', 'local']), optional}
].
src_validation(Def, User) ->
case protocols(Def) of
{amqp091, _} -> amqp091_src_validation(Def, User);
{amqp10, _} -> amqp10_src_validation(Def, User)
{amqp10, _} -> amqp10_src_validation(Def, User);
{local, _} -> local_src_validation(Def, User)
end.
local_src_validation(_Def, User) ->
[
{<<"src-uri">>, validate_uri_fun(User), mandatory},
{<<"src-exchange">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-exchange-key">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-queue">>, fun rabbit_parameter_validation:binary/2, optional},
{<<"src-queue-args">>, fun validate_queue_args/2, optional},
{<<"src-consumer-args">>, fun validate_consumer_args/2, optional},
{<<"src-prefetch-count">>, fun rabbit_parameter_validation:number/2, optional},
{<<"src-delete-after">>, fun validate_delete_after/2, optional},
{<<"src-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional}
].
amqp10_src_validation(_Def, User) ->
[
@ -173,7 +208,8 @@ dest_validation(Def0, User) ->
Def = rabbit_data_coercion:to_proplist(Def0),
case protocols(Def) of
{_, amqp091} -> amqp091_dest_validation(Def, User);
{_, amqp10} -> amqp10_dest_validation(Def, User)
{_, amqp10} -> amqp10_dest_validation(Def, User);
{_, local} -> local_dest_validation(Def, User)
end.
amqp10_dest_validation(_Def, User) ->
@ -209,6 +245,17 @@ amqp091_dest_validation(_Def, User) ->
{<<"dest-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional}
].
local_dest_validation(_Def, User) ->
[{<<"dest-uri">>, validate_uri_fun(User), mandatory},
{<<"dest-exchange">>, fun rabbit_parameter_validation:binary/2,optional},
{<<"dest-exchange-key">>,fun rabbit_parameter_validation:binary/2,optional},
{<<"dest-queue">>, fun rabbit_parameter_validation:amqp091_queue_name/2,optional},
{<<"dest-queue-args">>, fun validate_queue_args/2, optional},
{<<"dest-add-forward-headers">>, fun rabbit_parameter_validation:boolean/2,optional},
{<<"dest-add-timestamp-header">>, fun rabbit_parameter_validation:boolean/2,optional},
{<<"dest-predeclared">>, fun rabbit_parameter_validation:boolean/2, optional}
].
validate_uri_fun(User) ->
fun (Name, Term) -> validate_uri(Name, Term, User) end.
@ -322,7 +369,8 @@ parse({VHost, Name}, ClusterName, Def) ->
parse_source(Def) ->
case protocols(Def) of
{amqp10, _} -> parse_amqp10_source(Def);
{amqp091, _} -> parse_amqp091_source(Def)
{amqp091, _} -> parse_amqp091_source(Def);
{local, _} -> parse_local_source(Def)
end.
parse_dest(VHostName, ClusterName, Def, SourceHeaders) ->
@ -330,7 +378,9 @@ parse_dest(VHostName, ClusterName, Def, SourceHeaders) ->
{_, amqp10} ->
parse_amqp10_dest(VHostName, ClusterName, Def, SourceHeaders);
{_, amqp091} ->
parse_amqp091_dest(VHostName, ClusterName, Def, SourceHeaders)
parse_amqp091_dest(VHostName, ClusterName, Def, SourceHeaders);
{_, local} ->
parse_local_dest(VHostName, ClusterName, Def, SourceHeaders)
end.
parse_amqp10_dest({_VHost, _Name}, _ClusterName, Def, SourceHeaders) ->
@ -407,6 +457,35 @@ parse_amqp091_dest({VHost, Name}, ClusterName, Def, SourceHeaders) ->
AddTimestampHeader]}
}, Details).
parse_local_dest({_VHost, _Name}, _ClusterName, Def, _SourceHeaders) ->
Mod = rabbit_local_shovel,
DestURIs = deobfuscated_uris(<<"dest-uri">>, Def),
DestX = pget(<<"dest-exchange">>, Def, none),
DestXKey = pget(<<"dest-exchange-key">>, Def, none),
DestQ = pget(<<"dest-queue">>, Def, none),
DestQArgs = pget(<<"dest-queue-args">>, Def, #{}),
GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false),
Predeclared = pget(<<"dest-predeclared">>, Def, GlobalPredeclared),
DestDeclFun = case Predeclared of
true -> {Mod, dest_check_queue, [DestQ, DestQArgs]};
false -> {Mod, dest_decl_queue, [DestQ, DestQArgs]}
end,
AddHeaders = pget(<<"dest-add-forward-headers">>, Def, false),
AddTimestampHeader = pget(<<"dest-add-timestamp-header">>, Def, false),
%% Details are only used for status report in rabbitmqctl, as vhost is not
%% available to query the runtime parameters.
Details = maps:from_list([{K, V} || {K, V} <- [{exchange, DestX},
{routing_key, DestXKey},
{queue, DestQ}],
V =/= none]),
maps:merge(#{module => rabbit_local_shovel,
uris => DestURIs,
resource_decl => DestDeclFun,
add_forward_headers => AddHeaders,
add_timestamp_header => AddTimestampHeader
}, Details).
fields_fun(X, Key, _SrcURI, _DestURI, P0) ->
P1 = case X of
none -> P0;
@ -496,6 +575,49 @@ parse_amqp091_source(Def) ->
consumer_args => SrcCArgs
}, Details), DestHeaders}.
parse_local_source(Def) ->
%% TODO add exchange source back
Mod = rabbit_local_shovel,
SrcURIs = deobfuscated_uris(<<"src-uri">>, Def),
SrcX = pget(<<"src-exchange">>,Def, none),
SrcXKey = pget(<<"src-exchange-key">>, Def, <<>>),
SrcQ = pget(<<"src-queue">>, Def, none),
SrcQArgs = pget(<<"src-queue-args">>, Def, #{}),
SrcCArgs = rabbit_misc:to_amqp_table(pget(<<"src-consumer-args">>, Def, [])),
GlobalPredeclared = proplists:get_value(predeclared, application:get_env(?APP, topology, []), false),
Predeclared = pget(<<"src-predeclared">>, Def, GlobalPredeclared),
{SrcDeclFun, Queue, DestHeaders} =
case SrcQ of
none -> {{Mod, src_decl_exchange, [SrcX, SrcXKey]}, <<>>,
[{<<"src-exchange">>, SrcX},
{<<"src-exchange-key">>, SrcXKey}]};
_ -> case Predeclared of
false ->
{{Mod, decl_queue, [SrcQ, SrcQArgs]},
SrcQ, [{<<"src-queue">>, SrcQ}]};
true ->
{{Mod, check_queue, [SrcQ, SrcQArgs]},
SrcQ, [{<<"src-queue">>, SrcQ}]}
end
end,
DeleteAfter = pget(<<"src-delete-after">>, Def,
pget(<<"delete-after">>, Def, <<"never">>)),
PrefetchCount = pget(<<"src-prefetch-count">>, Def,
pget(<<"prefetch-count">>, Def, 1000)),
%% Details are only used for status report in rabbitmqctl, as vhost is not
%% available to query the runtime parameters.
Details = maps:from_list([{K, V} || {K, V} <- [{exchange, SrcX},
{routing_key, SrcXKey}],
V =/= none]),
{maps:merge(#{module => Mod,
uris => SrcURIs,
resource_decl => SrcDeclFun,
queue => Queue,
delete_after => opt_b2a(DeleteAfter),
prefetch_count => PrefetchCount,
consumer_args => SrcCArgs
}, Details), DestHeaders}.
src_decl_exchange(SrcX, SrcXKey, _Conn, Ch) ->
Ms = [#'queue.declare'{exclusive = true},
#'queue.bind'{routing_key = SrcXKey,

View File

@ -11,12 +11,15 @@
add_timestamp_header/1,
delete_shovel/3,
restart_shovel/2,
get_shovel_parameter/1]).
get_shovel_parameter/1,
gen_unique_name/2,
decl_fun/2]).
-include_lib("rabbit_common/include/rabbit_framing.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("kernel/include/logger.hrl").
-define(APP, rabbitmq_shovel).
-define(ROUTING_HEADER, <<"x-shovelled">>).
-define(TIMESTAMP_HEADER, <<"x-shovelled-timestamp">>).
@ -99,3 +102,58 @@ get_shovel_parameter({VHost, ShovelName}) ->
rabbit_runtime_parameters:lookup(VHost, <<"shovel">>, ShovelName);
get_shovel_parameter(ShovelName) ->
rabbit_runtime_parameters:lookup(<<"/">>, <<"shovel">>, ShovelName).
gen_unique_name(Pre0, Post0) ->
Pre = rabbit_data_coercion:to_binary(Pre0),
Post = rabbit_data_coercion:to_binary(Post0),
Id = bin_to_hex(crypto:strong_rand_bytes(8)),
<<Pre/binary, <<"_">>/binary, Id/binary, <<"_">>/binary, Post/binary>>.
bin_to_hex(Bin) ->
<<<<if N >= 10 -> N -10 + $a;
true -> N + $0 end>>
|| <<N:4>> <= Bin>>.
decl_fun(Mod, {source, Endpoint}) ->
case parse_declaration({proplists:get_value(declarations, Endpoint, []), []}) of
[] ->
case proplists:get_value(predeclared, application:get_env(?APP, topology, []), false) of
true -> case proplists:get_value(queue, Endpoint) of
<<>> -> fail({invalid_parameter_value, declarations, {require_non_empty}});
Queue -> {Mod, check_fun, [Queue]}
end;
false -> {Mod, decl_fun, []}
end;
Decl -> {Mod, decl_fun, [Decl]}
end;
decl_fun(Mod, {destination, Endpoint}) ->
Decl = parse_declaration({proplists:get_value(declarations, Endpoint, []), []}),
{Mod, decl_fun, [Decl]}.
parse_declaration({[], Acc}) ->
Acc;
parse_declaration({[{Method, Props} | Rest], Acc}) when is_list(Props) ->
FieldNames = try rabbit_framing_amqp_0_9_1:method_fieldnames(Method)
catch exit:Reason -> fail(Reason)
end,
case proplists:get_keys(Props) -- FieldNames of
[] -> ok;
UnknownFields -> fail({unknown_fields, Method, UnknownFields})
end,
{Res, _Idx} = lists:foldl(
fun (K, {R, Idx}) ->
NewR = case proplists:get_value(K, Props) of
undefined -> R;
V -> setelement(Idx, R, V)
end,
{NewR, Idx + 1}
end, {rabbit_framing_amqp_0_9_1:method_record(Method), 2},
FieldNames),
parse_declaration({Rest, [Res | Acc]});
parse_declaration({[{Method, Props} | _Rest], _Acc}) ->
fail({expected_method_field_list, Method, Props});
parse_declaration({[Method | Rest], Acc}) ->
parse_declaration({[{Method, []} | Rest], Acc}).
-spec fail(term()) -> no_return().
fail(Reason) -> throw({error, Reason}).

View File

@ -52,6 +52,9 @@ init([Type, Name, Config0]) ->
Config0;
dynamic ->
ClusterName = rabbit_nodes:cluster_name(),
%% TODO It could handle errors while parsing
%% (i.e. missing predeclared queues) and stop nicely
%% without long stacktraces
{ok, Conf} = rabbit_shovel_parameters:parse(Name,
ClusterName,
Config0),
@ -103,10 +106,14 @@ handle_cast(init_shovel, State = #state{config = Config}) ->
[human_readable_name(maps:get(name, Config2))]),
State1 = State#state{config = Config2},
ok = report_running(State1),
{noreply, State1}.
{noreply, State1};
handle_cast(Msg, State) ->
handle_msg(Msg, State).
handle_info(Msg, State) ->
handle_msg(Msg, State).
handle_info(Msg, State = #state{config = Config, name = Name}) ->
handle_msg(Msg, State = #state{config = Config, name = Name}) ->
case rabbit_shovel_behaviour:handle_source(Msg, Config) of
not_handled ->
case rabbit_shovel_behaviour:handle_dest(Msg, Config) of

View File

@ -122,7 +122,6 @@ amqp10_destination(Config, AckMode) ->
receive
{amqp10_msg, Receiver, InMsg} ->
ct:pal("GOT ~p", [InMsg]),
[<<42>>] = amqp10_msg:body(InMsg),
Ts = Timestamp * 1000,
?assertMatch(

View File

@ -0,0 +1,517 @@
%% This Source Code Form is subject to the terms of the Mozilla Public
%% License, v. 2.0. If a copy of the MPL was not distributed with this
%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
%%
%% Copyright (c) 2007-2025 Broadcom. All Rights Reserved. The term Broadcom refers to Broadcom Inc. and/or its subsidiaries. All rights reserved.
%%
-module(local_SUITE).
-include_lib("amqp_client/include/amqp_client.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
-include_lib("rabbitmq_ct_helpers/include/rabbit_assert.hrl").
-compile(export_all).
-define(EXCHANGE, <<"test_exchange">>).
-define(TO_SHOVEL, <<"to_the_shovel">>).
-define(FROM_SHOVEL, <<"from_the_shovel">>).
-define(UNSHOVELLED, <<"unshovelled">>).
-define(SHOVELLED, <<"shovelled">>).
-define(TIMEOUT, 1000).
all() ->
[
{group, tests}
].
groups() ->
[
{tests, [], [
local_destination_no_ack,
local_destination_on_publish,
local_destination_on_confirm,
local_destination_forward_headers_amqp10,
local_destination_forward_headers_amqp091,
local_destination_no_forward_headers_amqp10,
local_destination_timestamp_header_amqp10,
local_destination_timestamp_header_amqp091,
local_destination_no_timestamp_header_amqp10,
local_source_no_ack,
local_source_on_publish,
local_source_on_confirm,
local_source_anonymous_queue
]}
].
%% -------------------------------------------------------------------
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
init_per_suite(Config) ->
{ok, _} = application:ensure_all_started(amqp10_client),
rabbit_ct_helpers:log_environment(),
Config1 = rabbit_ct_helpers:set_config(Config, [
{rmq_nodename_suffix, ?MODULE}
]),
rabbit_ct_helpers:run_setup_steps(Config1,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps() ++
[fun stop_shovel_plugin/1]).
end_per_suite(Config) ->
application:stop(amqp10_client),
rabbit_ct_helpers:run_teardown_steps(Config,
rabbit_ct_client_helpers:teardown_steps() ++
rabbit_ct_broker_helpers:teardown_steps()).
init_per_group(_, Config) ->
Config.
end_per_group(_, Config) ->
Config.
init_per_testcase(Testcase, Config) ->
rabbit_ct_helpers:testcase_started(Config, Testcase).
end_per_testcase(Testcase, Config) ->
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, [[<<"source-queue">>, <<"dest-queue">>]]),
rabbit_ct_helpers:testcase_finished(Config, Testcase).
stop_shovel_plugin(Config) ->
ok = rabbit_ct_broker_helpers:rpc(Config, 0,
application, stop, [rabbitmq_shovel]),
Config.
%% -------------------------------------------------------------------
%% Testcases.
%% -------------------------------------------------------------------
%% TODO test errors when queue has not been predeclared, it just crashes
%% with a case_clause right now
local_destination_no_ack(Config) ->
local_destination(Config, no_ack).
local_destination_on_publish(Config) ->
local_destination(Config, on_publish).
local_destination_on_confirm(Config) ->
local_destination(Config, on_confirm).
local_destination(Config, AckMode) ->
TargetQ = <<"dest-queue">>,
ok = setup_local_destination_shovel(Config, TargetQ, AckMode, []),
{Conn, Receiver} = attach_receiver(Config, TargetQ),
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
Timestamp = erlang:system_time(millisecond),
Msg = #amqp_msg{payload = <<42>>,
props = #'P_basic'{delivery_mode = 2,
headers = [{<<"header1">>, long, 1},
{<<"header2">>, longstr, <<"h2">>}],
content_encoding = ?UNSHOVELLED,
content_type = ?UNSHOVELLED,
correlation_id = ?UNSHOVELLED,
%% needs to be guest here
user_id = <<"guest">>,
message_id = ?UNSHOVELLED,
reply_to = ?UNSHOVELLED,
timestamp = Timestamp,
type = ?UNSHOVELLED
}},
publish(Chan, Msg, ?EXCHANGE, ?TO_SHOVEL),
receive
{amqp10_msg, Receiver, InMsg} ->
[<<42>>] = amqp10_msg:body(InMsg),
#{content_type := ?UNSHOVELLED,
content_encoding := ?UNSHOVELLED,
correlation_id := ?UNSHOVELLED,
user_id := <<"guest">>,
message_id := ?UNSHOVELLED,
reply_to := ?UNSHOVELLED
} = amqp10_msg:properties(InMsg),
#{<<"header1">> := 1,
<<"header2">> := <<"h2">>
} = amqp10_msg:application_properties(InMsg),
#{<<"x-basic-type">> := ?UNSHOVELLED
} = amqp10_msg:message_annotations(InMsg),
#{durable := true} = amqp10_msg:headers(InMsg),
ok
after ?TIMEOUT ->
throw(timeout_waiting_for_deliver1)
end,
?awaitMatch([[_, <<"1">>, <<"0">>],
[<<"dest-queue">>, <<"1">>, <<"0">>]],
lists:sort(
rabbit_ct_broker_helpers:rabbitmqctl_list(
Config, 0,
["list_queues", "name", "consumers", "messages", "--no-table-headers"])),
30000),
[{test_shovel, static, {running, _Info}, _Metrics, _Time}] =
rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_shovel_status, status, []),
detach_receiver(Conn, Receiver),
rabbit_ct_client_helpers:close_channel(Chan).
local_destination_forward_headers_amqp10(Config) ->
TargetQ = <<"dest-queue">>,
ok = setup_local_destination_shovel(Config, TargetQ, on_publish,
[{add_forward_headers, true}]),
{Conn, Receiver} = attach_receiver(Config, TargetQ),
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
Msg = #amqp_msg{props = #'P_basic'{}},
publish(Chan, Msg, ?EXCHANGE, ?TO_SHOVEL),
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Node = atom_to_binary(NodeA),
receive
{amqp10_msg, Receiver, InMsg} ->
?assertMatch(#{<<"x-opt-shovelled-by">> := Node,
<<"x-opt-shovel-type">> := <<"static">>,
<<"x-opt-shovel-name">> := <<"test_shovel">>},
amqp10_msg:message_annotations(InMsg))
after ?TIMEOUT ->
throw(timeout_waiting_for_deliver1)
end,
detach_receiver(Conn, Receiver),
rabbit_ct_client_helpers:close_channel(Chan).
local_destination_forward_headers_amqp091(Config) ->
%% Check that we can consume with 0.9.1 or 1.0 and no properties are
%% lost in translation
TargetQ = <<"dest-queue">>,
ok = setup_local_destination_shovel(Config, TargetQ, on_publish,
[{add_forward_headers, true}]),
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
CTag = consume(Chan, TargetQ, true),
Msg = #amqp_msg{props = #'P_basic'{}},
publish(Chan, Msg, ?EXCHANGE, ?TO_SHOVEL),
[NodeA] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
Node = atom_to_binary(NodeA),
ExpectedHeaders = lists:sort(
[{<<"x-opt-shovelled-by">>, longstr, Node},
{<<"x-opt-shovel-type">>, longstr, <<"static">>},
{<<"x-opt-shovel-name">>, longstr, <<"test_shovel">>}]),
receive
{#'basic.deliver'{consumer_tag = CTag},
#amqp_msg{props = #'P_basic'{headers = Headers}}} ->
?assertMatch(ExpectedHeaders,
lists:sort(Headers))
after ?TIMEOUT -> throw(timeout_waiting_for_deliver1)
end,
rabbit_ct_client_helpers:close_channel(Chan).
local_destination_no_forward_headers_amqp10(Config) ->
TargetQ = <<"dest-queue">>,
ok = setup_local_destination_shovel(Config, TargetQ, on_publish,
[{add_forward_headers, false}]),
{Conn, Receiver} = attach_receiver(Config, TargetQ),
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
Msg = #amqp_msg{props = #'P_basic'{}},
publish(Chan, Msg, ?EXCHANGE, ?TO_SHOVEL),
receive
{amqp10_msg, Receiver, InMsg} ->
Anns = amqp10_msg:message_annotations(InMsg),
?assertNot(maps:is_key(<<"x-opt-shovelled-by">>, Anns)),
?assertNot(maps:is_key(<<"x-opt-shovel-type">>, Anns)),
?assertNot(maps:is_key(<<"x-opt-shovel-name">>, Anns)),
ok
after ?TIMEOUT ->
throw(timeout_waiting_for_deliver1)
end,
detach_receiver(Conn, Receiver),
rabbit_ct_client_helpers:close_channel(Chan).
local_destination_timestamp_header_amqp10(Config) ->
TargetQ = <<"dest-queue">>,
ok = setup_local_destination_shovel(Config, TargetQ, on_publish,
[{add_timestamp_header, true}]),
{Conn, Receiver} = attach_receiver(Config, TargetQ),
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
Msg = #amqp_msg{props = #'P_basic'{}},
publish(Chan, Msg, ?EXCHANGE, ?TO_SHOVEL),
receive
{amqp10_msg, Receiver, InMsg} ->
?assertMatch(#{<<"x-opt-shovelled-timestamp">> := _},
amqp10_msg:message_annotations(InMsg))
after ?TIMEOUT ->
throw(timeout_waiting_for_deliver1)
end,
detach_receiver(Conn, Receiver),
rabbit_ct_client_helpers:close_channel(Chan).
local_destination_timestamp_header_amqp091(Config) ->
TargetQ = <<"dest-queue">>,
ok = setup_local_destination_shovel(Config, TargetQ, on_publish,
[{add_timestamp_header, true}]),
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
CTag = consume(Chan, TargetQ, true),
Msg = #amqp_msg{props = #'P_basic'{}},
publish(Chan, Msg, ?EXCHANGE, ?TO_SHOVEL),
receive
{#'basic.deliver'{consumer_tag = CTag},
#amqp_msg{props = #'P_basic'{headers = Headers}}} ->
?assertMatch([{<<"x-opt-shovelled-timestamp">>, long, _}],
Headers)
after ?TIMEOUT -> throw(timeout_waiting_for_deliver1)
end,
rabbit_ct_client_helpers:close_channel(Chan).
local_destination_no_timestamp_header_amqp10(Config) ->
TargetQ = <<"dest-queue">>,
ok = setup_local_destination_shovel(Config, TargetQ, on_publish,
[{add_timestamp_header, false}]),
{Conn, Receiver} = attach_receiver(Config, TargetQ),
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
Msg = #amqp_msg{props = #'P_basic'{}},
publish(Chan, Msg, ?EXCHANGE, ?TO_SHOVEL),
receive
{amqp10_msg, Receiver, InMsg} ->
Anns = amqp10_msg:message_annotations(InMsg),
?assertNot(maps:is_key(<<"x-opt-shovelled-timestamp">>, Anns))
after ?TIMEOUT ->
throw(timeout_waiting_for_deliver1)
end,
detach_receiver(Conn, Receiver),
rabbit_ct_client_helpers:close_channel(Chan).
local_source_no_ack(Config) ->
local_source(Config, no_ack).
local_source_on_publish(Config) ->
local_source(Config, on_publish).
local_source_on_confirm(Config) ->
local_source(Config, on_confirm).
local_source(Config, AckMode) ->
SourceQ = <<"source-queue">>,
DestQ = <<"dest-queue">>,
ok = setup_local_source_shovel(Config, SourceQ, DestQ, AckMode),
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
CTag = consume(Chan, DestQ, AckMode =:= no_ack),
Msg = #amqp_msg{payload = <<42>>,
props = #'P_basic'{delivery_mode = 2,
content_type = ?UNSHOVELLED}},
% publish to source
publish(Chan, Msg, <<>>, SourceQ),
receive
{#'basic.deliver'{consumer_tag = CTag, delivery_tag = AckTag},
#amqp_msg{payload = <<42>>,
props = #'P_basic'{headers = [{<<"x-shovelled">>, _, _},
{<<"x-shovelled-timestamp">>,
long, _}]}}} ->
case AckMode of
no_ack -> ok;
_ -> ok = amqp_channel:call(
Chan, #'basic.ack'{delivery_tag = AckTag})
end
after ?TIMEOUT -> throw(timeout_waiting_for_deliver1)
end,
QueuesAndConsumers = lists:sort([[<<"source-queue">>,<<"1">>,<<"0">>],
[<<"dest-queue">>,<<"1">>,<<"0">>]]),
?awaitMatch(QueuesAndConsumers,
lists:sort(
rabbit_ct_broker_helpers:rabbitmqctl_list(
Config, 0,
["list_queues", "name", "consumers", "messages", "--no-table-headers"])),
30000),
[{test_shovel, static, {running, _Info}, _Metrics, _Time}] =
rabbit_ct_broker_helpers:rpc(Config, 0,
rabbit_shovel_status, status, []),
rabbit_ct_client_helpers:close_channel(Chan).
local_source_anonymous_queue(Config) ->
DestQ = <<"dest-queue">>,
ok = setup_local_server_named_shovel(Config, DestQ, no_ack),
Chan = rabbit_ct_client_helpers:open_channel(Config, 0),
CTag = consume(Chan, DestQ, true),
Msg = #amqp_msg{payload = <<42>>,
props = #'P_basic'{delivery_mode = 2,
content_type = ?UNSHOVELLED}},
% publish to source
publish(Chan, Msg, <<"amq.fanout">>, <<>>),
receive
{#'basic.deliver'{consumer_tag = CTag},
#amqp_msg{payload = <<42>>,
props = #'P_basic'{}}} ->
ok
after ?TIMEOUT -> throw(timeout_waiting_for_deliver1)
end,
rabbit_ct_client_helpers:close_channel(Chan).
%%
%% Internal
%%
setup_local_source_shovel(Config, SourceQueue, DestQueue, AckMode) ->
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Shovel = [{test_shovel,
[{source,
[{uris, [rabbit_misc:format("amqp://~ts:~b/%2f?heartbeat=5",
[Hostname, Port])]},
{protocol, local},
{queue, SourceQueue},
{declarations,
[{'queue.declare', [{queue, SourceQueue}, auto_delete]}]}
]
},
{destination,
[{uris, [rabbit_misc:format("amqp://~ts:~b/%2f?heartbeat=5",
[Hostname, Port])]},
{declarations,
[{'queue.declare', [{queue, DestQueue}, auto_delete]}]},
{publish_fields, [{exchange, <<>>},
{routing_key, DestQueue}]},
{publish_properties, [{delivery_mode, 2},
{content_type, ?SHOVELLED}]},
{add_forward_headers, true},
{add_timestamp_header, true}]},
{queue, <<>>},
{ack_mode, AckMode}
]}],
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, setup_shovel,
[Shovel]).
setup_local_destination_shovel(Config, Queue, AckMode, Dest) ->
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Shovel = [{test_shovel,
[{source,
[{uris, [rabbit_misc:format("amqp://~ts:~b/%2f?heartbeat=5",
[Hostname, Port])]},
{declarations,
[{'queue.declare', [exclusive, auto_delete]},
{'exchange.declare', [{exchange, ?EXCHANGE}, auto_delete]},
{'queue.bind', [{queue, <<>>}, {exchange, ?EXCHANGE},
{routing_key, ?TO_SHOVEL}]}]},
{queue, <<>>}]},
{destination,
[{protocol, local},
{declarations,
[{'queue.declare', [{queue, Queue}, auto_delete]}]},
{uris, [rabbit_misc:format("amqp://~ts:~b",
[Hostname, Port])]},
{dest_exchange, <<>>},
{dest_routing_key, Queue}] ++ Dest
},
{ack_mode, AckMode}]}],
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, setup_shovel,
[Shovel]).
setup_local_server_named_shovel(Config, DestQueue, AckMode) ->
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
Shovel = [{test_shovel,
[{source,
[{uris, [rabbit_misc:format("amqp://~ts:~b/%2f?heartbeat=5",
[Hostname, Port])]},
{protocol, local},
{queue, <<>>},
{declarations,
['queue.declare',
{'queue.bind', [
{exchange, <<"amq.fanout">>},
{queue, <<>>}
]}]}
]
},
{destination,
[{protocol, local},
{declarations,
[{'queue.declare', [{queue, DestQueue}, auto_delete]}]},
{uris, [rabbit_misc:format("amqp://~ts:~b",
[Hostname, Port])]},
{dest_exchange, <<>>},
{dest_routing_key, DestQueue}]},
{ack_mode, AckMode}
]}],
ok = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, setup_shovel,
[Shovel]).
setup_shovel(ShovelConfig) ->
_ = application:stop(rabbitmq_shovel),
application:set_env(rabbitmq_shovel, shovels, ShovelConfig, infinity),
ok = application:start(rabbitmq_shovel),
await_running_shovel(test_shovel).
await_running_shovel(Name) ->
case [N || {N, _, {running, _}, _, _}
<- rabbit_shovel_status:status(),
N =:= Name] of
[_] -> ok;
_ -> timer:sleep(100),
await_running_shovel(Name)
end.
consume(Chan, Queue, NoAck) ->
#'basic.consume_ok'{consumer_tag = CTag} =
amqp_channel:subscribe(Chan, #'basic.consume'{queue = Queue,
no_ack = NoAck,
exclusive = false},
self()),
receive
#'basic.consume_ok'{consumer_tag = CTag} -> ok
after ?TIMEOUT -> throw(timeout_waiting_for_consume_ok)
end,
CTag.
publish(Chan, Msg, Exchange, RoutingKey) ->
ok = amqp_channel:call(Chan, #'basic.publish'{exchange = Exchange,
routing_key = RoutingKey},
Msg).
delete_queues(Qs) when is_list(Qs) ->
(catch lists:foreach(fun delete_testcase_queue/1, Qs)).
delete_testcase_queue(Name) ->
QName = rabbit_misc:r(<<"/">>, queue, Name),
case rabbit_amqqueue:lookup(QName) of
{ok, Q} ->
{ok, _} = rabbit_amqqueue:delete(Q, false, false, <<"dummy">>);
_ ->
ok
end.
attach_receiver(Config, TargetQ) ->
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
{ok, Conn} = amqp10_client:open_connection(Hostname, Port),
{ok, Sess} = amqp10_client:begin_session(Conn),
{ok, Receiver} = amqp10_client:attach_receiver_link(Sess,
<<"amqp-destination-receiver">>,
TargetQ, settled, unsettled_state),
ok = amqp10_client:flow_link_credit(Receiver, 5, never),
{Conn, Receiver}.
detach_receiver(Conn, Receiver) ->
amqp10_client:detach_link(Receiver),
amqp10_client:close_connection(Conn).

File diff suppressed because it is too large Load Diff

View File

@ -13,7 +13,9 @@
shovels_from_status/0, shovels_from_status/1,
get_shovel_status/2, get_shovel_status/3,
restart_shovel/2,
await/1, await/2, clear_param/2, clear_param/3, make_uri/2]).
await/1, await/2, clear_param/2, clear_param/3, make_uri/2,
make_uri/3, make_uri/5,
await_shovel1/4, await_no_shovel/2]).
make_uri(Config, Node) ->
Hostname = ?config(rmq_hostname, Config),
@ -21,6 +23,18 @@ make_uri(Config, Node) ->
list_to_binary(lists:flatten(io_lib:format("amqp://~ts:~b",
[Hostname, Port]))).
make_uri(Config, Node, VHost) ->
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp),
list_to_binary(lists:flatten(io_lib:format("amqp://~ts:~b/~ts",
[Hostname, Port, VHost]))).
make_uri(Config, Node, User, Password, VHost) ->
Hostname = ?config(rmq_hostname, Config),
Port = rabbit_ct_broker_helpers:get_node_config(Config, Node, tcp_port_amqp),
list_to_binary(lists:flatten(io_lib:format("amqp://~ts:~ts@~ts:~b/~ts",
[User, Password, Hostname, Port, VHost]))).
set_param(Config, Name, Value) ->
set_param_nowait(Config, 0, 0, Name, Value),
await_shovel(Config, 0, Name).
@ -53,13 +67,26 @@ await_shovel(Config, Node, Name, ExpectedState) ->
rabbit_ct_broker_helpers:rpc(Config, Node,
?MODULE, await_shovel1, [Config, Name, ExpectedState]).
await_shovel1(_Config, Name, ExpectedState) ->
await_shovel1(Config, Name, ExpectedState) ->
await_shovel1(Config, Name, ExpectedState, 30_000).
await_shovel1(_Config, Name, ExpectedState, Timeout) ->
Ret = await(fun() ->
Status = shovels_from_status(ExpectedState),
lists:member(Name, Status)
end, 30_000),
end, Timeout),
Ret.
await_no_shovel(Config, Name) ->
try
rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, await_shovel1,
[Config, Name, running, 10_000]),
throw(unexpected_success)
catch
_:{exception, {await_timeout, false}, _} ->
ok
end.
shovels_from_status() ->
shovels_from_status(running).