Split out direct and topic routing
This commit is contained in:
		
							parent
							
								
									10c0fc56ad
								
							
						
					
					
						commit
						4391947d60
					
				|  | @ -185,48 +185,43 @@ simple_publish(Mandatory, Immediate, | ||||||
| route(#exchange{name = Name, type = topic}, RoutingKey) -> | route(#exchange{name = Name, type = topic}, RoutingKey) -> | ||||||
|     route_internal(Name, RoutingKey, fun topic_matches/2); |     route_internal(Name, RoutingKey, fun topic_matches/2); | ||||||
| 
 | 
 | ||||||
| % This matches for the direct exchanges and tries to short-cut the routing table |  | ||||||
| % for the default queue if that is what the user supplied |  | ||||||
| 
 |  | ||||||
| % route(#exchange{name = Name = #resource{name = <<>>}, type = direct}, RoutingKey) -> |  | ||||||
| %     case route_internal(Name, RoutingKey) of |  | ||||||
| %         [] -> route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end); |  | ||||||
| %         Other -> Other |  | ||||||
| %     end; |  | ||||||
| 
 |  | ||||||
| route(#exchange{name = Name, type = Type}, RoutingKey) -> | route(#exchange{name = Name, type = Type}, RoutingKey) -> | ||||||
|     route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end). |     route_internal(Name, RoutingKey). | ||||||
| 
 | 
 | ||||||
| % This returns a list of QPids to route to. | % This returns a list of QPids to route to. | ||||||
| % Maybe this should be handled by a cursor instead. | % Maybe this should be handled by a cursor instead. | ||||||
| % This routes directly to queues, avoiding any lookup of routes | % This routes directly to queues, avoiding any lookup of routes | ||||||
| 
 | route_internal(#resource{name = Name, virtual_host = VHostPath}, RoutingKey) -> | ||||||
| % route_internal(#resource{name = <<>>, virtual_host = VHostPath}, RoutingKey) -> |     MatchHead = #route{binding = #binding{exchange_name = '$1', | ||||||
| %     Query = qlc:q([QPid || #amqqueue{name = Queue, pid = QPid} <- mnesia:table(amqqueue), |                                       queue_name = '$2', | ||||||
| %                            Queue == rabbit_misc:r(VHostPath, queue, RoutingKey)]), |                                       key = '$3'}}, | ||||||
| %     mnesia:activity(async_dirty, fun() -> qlc:e(Query) end). |     %MatchHead = #person{name='$1', sex=male, age='$2', _='_'}, | ||||||
| 
 |     Guards = [{'==', '$1', Name}, {'==', '$3', RoutingKey}], | ||||||
|  |     lookup_qpids( | ||||||
|  |         mnesia:activity(async_dirty, | ||||||
|  |                         fun() -> mnesia:select(route,[{MatchHead, Guards, ['$2']}]) | ||||||
|  |                         end)). | ||||||
|  |      | ||||||
| % This returns a list of QPids to route to. | % This returns a list of QPids to route to. | ||||||
| % Maybe this should be handled by a cursor instead. | % Maybe this should be handled by a cursor instead. | ||||||
| route_internal(Exchange, RoutingKey, MatchFun) -> | route_internal(Exchange, RoutingKey, MatchFun) -> | ||||||
|     Query = qlc:q([QName || #route{binding = #binding{exchange_name = ExchangeName, |     Query = qlc:q([QName || #route{binding = #binding{exchange_name = ExchangeName, | ||||||
|                                                      queue_name = QName, |                                                       queue_name = QName, | ||||||
|                                                      key = BindingKey}} <- mnesia:table(route), |                                                       key = BindingKey}} <- mnesia:table(route), | ||||||
|                            ExchangeName == Exchange, |                             ExchangeName == Exchange, | ||||||
|                            MatchFun(BindingKey, RoutingKey)]), |                             % This causes a full table scan (see bug 19336) | ||||||
|     Fun = fun() -> qlc:e(Query) end,     |                             MatchFun(BindingKey, RoutingKey)]), | ||||||
|     {Time1,L} = timer:tc(mnesia,activity,[async_dirty,Fun]), |     lookup_qpids(mnesia:activity(async_dirty, fun() -> qlc:e(Query) end)). | ||||||
|     Set = sets:from_list(L), | 
 | ||||||
|     Fun2 = fun() -> | lookup_qpids(Queues) -> | ||||||
|  |     Set = sets:from_list(Queues), | ||||||
|  |     Fun = fun() -> | ||||||
|             sets:fold( |             sets:fold( | ||||||
|                 fun(Key, Acc) -> [#amqqueue{pid = QPid}] = mnesia:read({amqqueue, Key}),                                  |                 fun(Key, Acc) -> [#amqqueue{pid = QPid}] = mnesia:read({amqqueue, Key}),                                  | ||||||
|                                  [QPid] ++ Acc end,  |                                  [QPid] ++ Acc end,  | ||||||
|                 [], Set) end, |                 [], Set) end, | ||||||
|     {Time2, QPids} = timer:tc(mnesia, activity, [async_dirty,Fun2]),     |     mnesia:activity(async_dirty,Fun). | ||||||
|     io:format("Time 1 -> ~p~n",[Time1]), |          | ||||||
|     io:format("Time 2 -> ~p~n",[Time2]), |  | ||||||
|     QPids. |  | ||||||
|      |  | ||||||
| % Should all of the route and binding management not be refactored to it's own module | % Should all of the route and binding management not be refactored to it's own module | ||||||
| % Especially seeing as unbind will have to be implemented for 0.91 ? | % Especially seeing as unbind will have to be implemented for 0.91 ? | ||||||
| delete_routes(Q = #amqqueue{name = Name}) -> | delete_routes(Q = #amqqueue{name = Name}) -> | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue