WIP: Add attach properties
This commit is contained in:
		
							parent
							
								
									303cca184b
								
							
						
					
					
						commit
						9deede03e4
					
				|  | @ -41,6 +41,7 @@ | |||
|          attach_receiver_link/4, | ||||
|          attach_receiver_link/5, | ||||
|          attach_receiver_link/6, | ||||
|          attach_receiver_link/7, | ||||
|          attach_link/2, | ||||
|          detach_link/1, | ||||
|          send_msg/2, | ||||
|  | @ -67,6 +68,7 @@ | |||
| -type attach_role() :: amqp10_client_session:attach_role(). | ||||
| -type attach_args() :: amqp10_client_session:attach_args(). | ||||
| -type filter() :: amqp10_client_session:filter(). | ||||
| -type properties() :: amqp10_client_session:properties(). | ||||
| 
 | ||||
| -type connection_config() :: amqp10_client_connection:connection_config(). | ||||
| 
 | ||||
|  | @ -258,12 +260,25 @@ attach_receiver_link(Session, Name, Source, SettleMode, Durability) -> | |||
|                            snd_settle_mode(), terminus_durability(), filter()) -> | ||||
|     {ok, link_ref()}. | ||||
| attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter) -> | ||||
|     attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, #{}). | ||||
| 
 | ||||
| %% @doc Attaches a receiver link to a source. | ||||
| %% This is asynchronous and will notify completion of the attach request to the | ||||
| %% caller using an amqp10_event of the following format: | ||||
| %% {amqp10_event, {link, LinkRef, attached | {detached, Why}}} | ||||
| -spec attach_receiver_link(pid(), binary(), binary(), | ||||
|                            snd_settle_mode(), terminus_durability(), filter(), | ||||
|                            properties()) -> | ||||
|     {ok, link_ref()}. | ||||
| attach_receiver_link(Session, Name, Source, SettleMode, Durability, Filter, Properties) -> | ||||
|     io:format(user, "~nat=~s:~p:~p", [?MODULE_STRING, ?FUNCTION_NAME, ?LINE]), | ||||
|     AttachArgs = #{name => Name, | ||||
|                    role => {receiver, #{address => Source, | ||||
|                                         durable => Durability}, self()}, | ||||
|                    snd_settle_mode => SettleMode, | ||||
|                    rcv_settle_mode => first, | ||||
|                    filter => Filter}, | ||||
|                    filter => Filter, | ||||
|                    properties => Properties}, | ||||
|     amqp10_client_session:attach(Session, AttachArgs). | ||||
| 
 | ||||
| -spec attach_link(pid(), attach_args()) -> {ok, link_ref()}. | ||||
|  |  | |||
|  | @ -94,6 +94,7 @@ | |||
| 
 | ||||
| % http://www.amqp.org/specification/1.0/filters | ||||
| -type filter() :: #{binary() => binary() | map() | list(binary())}. | ||||
| -type properties() :: #{binary() => binary() | list(binary())}. | ||||
| 
 | ||||
| -type attach_args() :: #{name => binary(), | ||||
|                          role => attach_role(), | ||||
|  | @ -111,6 +112,7 @@ | |||
|               attach_role/0, | ||||
|               target_def/0, | ||||
|               source_def/0, | ||||
|               properties/0, | ||||
|               filter/0]). | ||||
| 
 | ||||
| -record(link, | ||||
|  | @ -653,6 +655,20 @@ make_target(#{role := {sender, #{address := Address} = Target}}) -> | |||
|     #'v1_0.target'{address = {utf8, Address}, | ||||
|                    durable = {uint, Durable}}. | ||||
| 
 | ||||
| make_properties(#{properties := Properties}) -> | ||||
|     translate_properties(Properties). | ||||
| 
 | ||||
| translate_properties(Properties) when is_map(Properties) andalso map_size(Properties) =< 0 -> | ||||
|     undefined; | ||||
| translate_properties(Properties) when is_map(Properties) -> | ||||
|     {map, maps:fold(fun translate_property/3, [], Properties)}. | ||||
| 
 | ||||
| translate_property(K, V, Acc) when is_binary(V) -> | ||||
|     [{{symbol, K}, {described, {symbol, K}, {utf8, V}}} | Acc]; | ||||
| translate_property(K, V, Acc) when is_list(V) -> | ||||
|     Values = lists:map(fun(Id) -> {utf8, Id} end, V), | ||||
|     [{{symbol, K}, {described, {symbol, K}, Values}} | Acc]. | ||||
| 
 | ||||
| translate_terminus_durability(none) -> 0; | ||||
| translate_terminus_durability(configuration) -> 1; | ||||
| translate_terminus_durability(unsettled_state) -> 2. | ||||
|  | @ -723,6 +739,7 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _}, | |||
| 
 | ||||
|     Source = make_source(Args), | ||||
|     Target = make_target(Args), | ||||
|     Properties = make_properties(Args), | ||||
| 
 | ||||
|     {LinkTarget, RoleAsBool} = case Role of | ||||
|                                    {receiver, _, Pid} -> | ||||
|  | @ -731,11 +748,13 @@ send_attach(Send, #{name := Name, role := Role} = Args, {FromPid, _}, | |||
|                                        {TargetAddr, false} | ||||
|                                end, | ||||
| 
 | ||||
|     io:format(user, "~nat=~s:~p:~p", [?MODULE_STRING, ?FUNCTION_NAME, ?LINE]), | ||||
|     % create attach performative | ||||
|     Attach = #'v1_0.attach'{name = {utf8, Name}, | ||||
|                             role = RoleAsBool, | ||||
|                             handle = {uint, OutHandle}, | ||||
|                             source = Source, | ||||
|                             properties = Properties, | ||||
|                             initial_delivery_count = | ||||
|                                 {uint, ?INITIAL_DELIVERY_COUNT}, | ||||
|                             snd_settle_mode = snd_settle_mode(Args), | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue