Added vhost to binding spec
This commit is contained in:
		
							parent
							
								
									dcfe7ed8ca
								
							
						
					
					
						commit
						8834453890
					
				|  | @ -45,12 +45,12 @@ | ||||||
| %% This constant field seems to be required because the underlying storage is | %% This constant field seems to be required because the underlying storage is | ||||||
| %% ets, which stores key value pairs | %% ets, which stores key value pairs | ||||||
| 
 | 
 | ||||||
| %% The spec field is made up of an {Exchange, Binding, Queue} | %% The binding field is made up of an {Exchange, Binding, Queue} | ||||||
| -record(forwards_binding, {spec, value = const}). | -record(forwards_binding, {binding, value = const}). | ||||||
| %% The spec field is made up of an {Queue, Binding, Exchange} | %% The binding field is made up of an {Queue, Binding, Exchange} | ||||||
| -record(reverse_binding, {spec, value = const}). | -record(reverse_binding, {binding, value = const}). | ||||||
| 
 | 
 | ||||||
| -record(binding, {exchange, key, queue}). | -record(binding, {virtual_host, exchange_name, key, queue_name}). | ||||||
| 
 | 
 | ||||||
| -record(listener, {node, protocol, host, port}). | -record(listener, {node, protocol, host, port}). | ||||||
| 
 | 
 | ||||||
|  |  | ||||||
|  | @ -167,14 +167,17 @@ recover_queue(Q) -> | ||||||
|     ok. |     ok. | ||||||
| 
 | 
 | ||||||
| default_binding_spec(#resource{virtual_host = VHostPath, name = Name}) -> | default_binding_spec(#resource{virtual_host = VHostPath, name = Name}) -> | ||||||
|     exit(default_binding_spec). |     #binding{exchange_name = <<"">>, | ||||||
|  |              key = Name, | ||||||
|  |              queue_name = Name, | ||||||
|  |              virtual_host = VHostPath}. | ||||||
|     % #binding_spec{exchange_name = rabbit_misc:r(VHostPath,exchange,<<"">>), |     % #binding_spec{exchange_name = rabbit_misc:r(VHostPath,exchange,<<"">>), | ||||||
|     %                   routing_key = Name, |     %                   routing_key = Name, | ||||||
|     %                   arguments = []}. |     %                   arguments = []}. | ||||||
| 
 | 
 | ||||||
| recover_bindings(Q = #amqqueue{name = QueueName}) -> | recover_bindings(Q = #amqqueue{name = QueueName}) -> | ||||||
|     exit(recover_bindings). |     io:format("Q was ~p~n",[Q]), | ||||||
|     % ok = rabbit_exchange:add_binding(default_binding_spec(QueueName), Q), |     ok = rabbit_exchange:add_binding(default_binding_spec(QueueName)). | ||||||
|     %     lists:foreach(fun (B) -> |     %     lists:foreach(fun (B) -> | ||||||
|     %                           ok = rabbit_exchange:add_binding(B, Q) |     %                           ok = rabbit_exchange:add_binding(B, Q) | ||||||
|     %                   end, Specs), |     %                   end, Specs), | ||||||
|  |  | ||||||
|  | @ -221,11 +221,13 @@ delivery_key_for_type(_Type, Name, RoutingKey) -> | ||||||
| % Don't really like this double lookup | % Don't really like this double lookup | ||||||
| % It seems very clunky | % It seems very clunky | ||||||
| % Can this get refactored to to avoid the duplication of the lookup/1 function? | % Can this get refactored to to avoid the duplication of the lookup/1 function? | ||||||
| call_with_exchange_and_queue(Exchange, Queue, Fun) -> | call_with_exchange_and_queue(#binding{virtual_host = VHost, exchange_name = Exchange,  | ||||||
|     case mnesia:wread({exchange, Exchange}) of |                                       queue_name = Queue}, Fun) -> | ||||||
|  |     io:format("Reading (~p) and (~p) ~n",[Exchange,Queue]), | ||||||
|  |     case mnesia:wread({exchange, rabbit_misc:r(VHost, exchange, Exchange)}) of | ||||||
|         [] -> {error, exchange_not_found}; |         [] -> {error, exchange_not_found}; | ||||||
|         [X] ->  |         [X] ->  | ||||||
|             case mnesia:wread({amqqueue, Queue}) of |             case mnesia:wread({amqqueue, rabbit_misc:r(VHost, amqqueue, Queue)}) of | ||||||
|                 [] -> {error, queue_not_found}; |                 [] -> {error, queue_not_found}; | ||||||
|                 [Q] ->  |                 [Q] ->  | ||||||
|                     Fun(X,Q) |                     Fun(X,Q) | ||||||
|  | @ -237,13 +239,13 @@ make_handler(BindingSpec, #amqqueue{name = QueueName, pid = QPid}) -> | ||||||
|     exit(make_handler). |     exit(make_handler). | ||||||
|     %#handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}. |     %#handler{binding_spec = BindingSpec, queue = QueueName, qpid = QPid}. | ||||||
| 
 | 
 | ||||||
| add_binding(#binding{exchange = Exchange, key = Key, queue = Queue}) -> | add_binding(Binding) -> | ||||||
|     call_with_exchange_and_queue( |     call_with_exchange_and_queue( | ||||||
|               Exchange, Queue, |               Binding, | ||||||
|               fun (X,Q) -> if Q#amqqueue.durable and not(X#exchange.durable) -> |               fun (X,Q) -> if Q#amqqueue.durable and not(X#exchange.durable) -> | ||||||
|                                  {error, durability_settings_incompatible}; |                                  {error, durability_settings_incompatible}; | ||||||
|                             true -> |                             true -> | ||||||
|                                  internal_add_binding(X, Key, Q) |                                  internal_add_binding(Binding) | ||||||
|                          end |                          end | ||||||
|               end). |               end). | ||||||
| 
 | 
 | ||||||
|  | @ -278,13 +280,17 @@ handler_qpids(Handlers) -> | ||||||
|     exit(handler_qpids). |     exit(handler_qpids). | ||||||
|     %sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]). |     %sets:from_list([QPid || #handler{qpid = QPid} <- Handlers]). | ||||||
| 
 | 
 | ||||||
| %% Must run within a transaction. | reverse_binding(#binding{virtual_host = VHost, exchange_name = Exchange,  | ||||||
| internal_add_binding(#exchange{name = ExchangeName, type = Type}, |                          key = Key, queue_name = Queue}) -> | ||||||
|                      RoutingKey, Queue) -> |     {binding, VHost, Queue, Key, Exchange}. | ||||||
|     ok. |  | ||||||
|     %BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), |  | ||||||
|     %ok = add_handler_to_binding(BindingKey, Handler). |  | ||||||
| 
 | 
 | ||||||
|  | %% Must run within a transaction. | ||||||
|  | internal_add_binding(Binding) -> | ||||||
|  |     Forwards = #forwards_binding{ binding = Binding }, | ||||||
|  |     Reverse = #reverse_binding{ binding = reverse_binding(Binding) }, | ||||||
|  |     ok = mnesia:write(Forwards), | ||||||
|  |     ok = mnesia:write(Reverse). | ||||||
|  |      | ||||||
| %% Must run within a transaction. | %% Must run within a transaction. | ||||||
| internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) -> | internal_delete_binding(#exchange{name = ExchangeName, type = Type}, RoutingKey, Handler) -> | ||||||
|     BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), |     BindingKey = delivery_key_for_type(Type, ExchangeName, RoutingKey), | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue