parent
8ccdd7b6f7
commit
68e8ae8673
|
|
@ -199,7 +199,6 @@ apply({register_consumer,
|
|||
Owner,
|
||||
SubscriptionId},
|
||||
#?MODULE{groups = StreamGroups0} = State) ->
|
||||
%% FIXME monitor virtual hosts as well?
|
||||
rabbit_log:debug("New consumer ~p ~p in group ~p, partition index "
|
||||
"is ~p",
|
||||
[ConnectionPid,
|
||||
|
|
@ -444,7 +443,10 @@ handle_connection_down(Pid,
|
|||
undefined -> {S0, Eff0};
|
||||
#group{consumers = Consumers} ->
|
||||
%% iterate over the consumers of the group
|
||||
%% and unregister the ones from this PID
|
||||
%% and unregister the ones from this PID.
|
||||
%% It may not be optimal, computing the new active consumer
|
||||
%% from the purged group and notifying the remaining consumers
|
||||
%% appropriately should avoid unwanted notifications and even rebalancing.
|
||||
lists:foldl(fun (#consumer{pid = P,
|
||||
subscription_id =
|
||||
SubId},
|
||||
|
|
|
|||
|
|
@ -818,7 +818,7 @@ open(info,
|
|||
log = Log1},
|
||||
|
||||
Conn1 =
|
||||
maybe_notify_consumer(Transport,
|
||||
maybe_send_consumer_update(Transport,
|
||||
Connection0,
|
||||
SubId,
|
||||
Active,
|
||||
|
|
@ -2774,9 +2774,9 @@ maybe_register_consumer(VirtualHost,
|
|||
SubscriptionId),
|
||||
Active.
|
||||
|
||||
maybe_notify_consumer(_, Connection, _, _, false = _Sac, _) ->
|
||||
maybe_send_consumer_update(_, Connection, _, _, false = _Sac, _) ->
|
||||
Connection;
|
||||
maybe_notify_consumer(Transport,
|
||||
maybe_send_consumer_update(Transport,
|
||||
#stream_connection{socket = S,
|
||||
correlation_id_sequence = CorrIdSeq,
|
||||
outstanding_requests =
|
||||
|
|
|
|||
Loading…
Reference in New Issue