This is 10-fold improvement on the last version, still not right yet
This commit is contained in:
		
							parent
							
								
									b5f6bc33ff
								
							
						
					
					
						commit
						10c0fc56ad
					
				| 
						 | 
				
			
			@ -187,11 +187,12 @@ route(#exchange{name = Name, type = topic}, RoutingKey) ->
 | 
			
		|||
 | 
			
		||||
% This matches for the direct exchanges and tries to short-cut the routing table
 | 
			
		||||
% for the default queue if that is what the user supplied
 | 
			
		||||
route(#exchange{name = Name = #resource{name = <<>>}, type = direct}, RoutingKey) ->
 | 
			
		||||
    case route_internal(Name, RoutingKey) of
 | 
			
		||||
        [] -> route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end);
 | 
			
		||||
        Other -> Other
 | 
			
		||||
    end;
 | 
			
		||||
 | 
			
		||||
% route(#exchange{name = Name = #resource{name = <<>>}, type = direct}, RoutingKey) ->
 | 
			
		||||
%     case route_internal(Name, RoutingKey) of
 | 
			
		||||
%         [] -> route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end);
 | 
			
		||||
%         Other -> Other
 | 
			
		||||
%     end;
 | 
			
		||||
 | 
			
		||||
route(#exchange{name = Name, type = Type}, RoutingKey) ->
 | 
			
		||||
    route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end).
 | 
			
		||||
| 
						 | 
				
			
			@ -199,26 +200,32 @@ route(#exchange{name = Name, type = Type}, RoutingKey) ->
 | 
			
		|||
% This returns a list of QPids to route to.
 | 
			
		||||
% Maybe this should be handled by a cursor instead.
 | 
			
		||||
% This routes directly to queues, avoiding any lookup of routes
 | 
			
		||||
route_internal(#resource{name = <<>>, virtual_host = VHostPath}, RoutingKey) ->
 | 
			
		||||
    Query = qlc:q([QPid || #amqqueue{name = Queue, pid = QPid} <- mnesia:table(amqqueue),
 | 
			
		||||
                           Queue == rabbit_misc:r(VHostPath, queue, RoutingKey)]),
 | 
			
		||||
    mnesia:activity(async_dirty, fun() -> qlc:e(Query) end).
 | 
			
		||||
 | 
			
		||||
% route_internal(#resource{name = <<>>, virtual_host = VHostPath}, RoutingKey) ->
 | 
			
		||||
%     Query = qlc:q([QPid || #amqqueue{name = Queue, pid = QPid} <- mnesia:table(amqqueue),
 | 
			
		||||
%                            Queue == rabbit_misc:r(VHostPath, queue, RoutingKey)]),
 | 
			
		||||
%     mnesia:activity(async_dirty, fun() -> qlc:e(Query) end).
 | 
			
		||||
 | 
			
		||||
% This returns a list of QPids to route to.
 | 
			
		||||
% Maybe this should be handled by a cursor instead.
 | 
			
		||||
route_internal(Exchange, RoutingKey, MatchFun) ->
 | 
			
		||||
    Query = qlc:q([QPid || #route{binding = #binding{exchange_name = ExchangeName,
 | 
			
		||||
                                                     queue_name = QueueName,
 | 
			
		||||
    Query = qlc:q([QName || #route{binding = #binding{exchange_name = ExchangeName,
 | 
			
		||||
                                                     queue_name = QName,
 | 
			
		||||
                                                     key = BindingKey}} <- mnesia:table(route),
 | 
			
		||||
                           #amqqueue{name = Queue, pid = QPid} <- mnesia:table(amqqueue),
 | 
			
		||||
                           ExchangeName == Exchange,
 | 
			
		||||
                           QueueName == Queue,
 | 
			
		||||
                           MatchFun(BindingKey, RoutingKey)],
 | 
			
		||||
                           % This is put in as a quick and dirty way to preventing double routing
 | 
			
		||||
                           % but it is probably very expensive to maintain
 | 
			
		||||
                           {unique, true}),
 | 
			
		||||
    mnesia:activity(async_dirty, fun() -> qlc:e(Query) end).
 | 
			
		||||
    
 | 
			
		||||
                           MatchFun(BindingKey, RoutingKey)]),
 | 
			
		||||
    Fun = fun() -> qlc:e(Query) end,    
 | 
			
		||||
    {Time1,L} = timer:tc(mnesia,activity,[async_dirty,Fun]),
 | 
			
		||||
    Set = sets:from_list(L),
 | 
			
		||||
    Fun2 = fun() ->
 | 
			
		||||
            sets:fold(
 | 
			
		||||
                fun(Key, Acc) -> [#amqqueue{pid = QPid}] = mnesia:read({amqqueue, Key}),                                 
 | 
			
		||||
                                 [QPid] ++ Acc end, 
 | 
			
		||||
                [], Set) end,
 | 
			
		||||
    {Time2, QPids} = timer:tc(mnesia, activity, [async_dirty,Fun2]),    
 | 
			
		||||
    io:format("Time 1 -> ~p~n",[Time1]),
 | 
			
		||||
    io:format("Time 2 -> ~p~n",[Time2]),
 | 
			
		||||
    QPids.
 | 
			
		||||
    
 | 
			
		||||
% Should all of the route and binding management not be refactored to it's own module
 | 
			
		||||
% Especially seeing as unbind will have to be implemented for 0.91 ?
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue