Fix stream SAC computation and notification

Start index from the end because of lists:foldr.
Make sure to notify only the passive newcomer when there's
no change in SAC.

References #3753
This commit is contained in:
Arnaud Cogoluègnes 2021-11-25 10:38:59 +01:00
parent a607f842d1
commit 11ef789c38
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
1 changed files with 28 additions and 10 deletions

View File

@ -165,8 +165,10 @@ handle_call({register_consumer,
subscription_id = SubscriptionId,
active = false},
Group1 = add_to_group(Consumer0, Group0),
rabbit_log:debug("Consumer added to group: ~p", [Group1]),
Group2 = compute_active_consumer(Group1),
rabbit_log:debug("Consumers in group: ~p", [Group2]),
rabbit_log:debug("Consumers in group after active consumer computation: ~p",
[Group2]),
StreamGroups2 =
update_groups(VirtualHost,
Stream,
@ -198,7 +200,11 @@ handle_call({unregister_consumer,
rabbit_log:debug("Unregistering consumer ~p from group",
[Consumer]),
G1 = remove_from_group(Consumer, Group0),
rabbit_log:debug("Consumer removed from group: ~p",
[G1]),
G2 = compute_active_consumer(G1),
rabbit_log:debug("Consumers in group after active consumer computation: ~p",
[G2]),
NewActive =
case lookup_active_consumer(G2) of
{value, AC} ->
@ -268,15 +274,17 @@ compute_active_consumer(#group{partition_index = PartitionIndex,
consumers = Consumers0} =
Group) ->
ActiveConsumerIndex = PartitionIndex rem length(Consumers0),
rabbit_log:debug("Active consumer index = ~p rem ~p = ~p",
[PartitionIndex, length(Consumers0), ActiveConsumerIndex]),
{_, Consumers1} =
lists:foldr(fun (C0, {Index, Cs}) when Index == ActiveConsumerIndex ->
C1 = C0#consumer{active = true},
{Index + 1, [C1 | Cs]};
{Index - 1, [C1 | Cs]};
(C0, {Index, Cs}) ->
C1 = C0#consumer{active = false},
{Index + 1, [C1 | Cs]}
{Index - 1, [C1 | Cs]}
end,
{0, []}, Consumers0),
{length(Consumers0) - 1, []}, Consumers0),
Group#group{consumers = Consumers1}.
lookup_consumer(ConnectionPid, SubscriptionId,
@ -292,7 +300,7 @@ lookup_active_consumer(#group{consumers = Consumers}) ->
notify_consumers(_, _, #group{consumers = []}) ->
ok;
notify_consumers(_FormerActive,
notify_consumers(_,
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId} =
NewConsumer,
@ -301,7 +309,7 @@ notify_consumers(_FormerActive,
! {sac,
{{subscription_id, SubscriptionId}, {active, true},
{side_effects, []}}};
notify_consumers(_FormerActive,
notify_consumers(_,
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId} =
NewConsumer,
@ -310,7 +318,7 @@ notify_consumers(_FormerActive,
! {sac,
{{subscription_id, SubscriptionId}, {active, true},
{side_effects, []}}};
notify_consumers(_FormerActive,
notify_consumers(_,
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId},
#group{partition_index = -1, consumers = _}) ->
@ -318,7 +326,7 @@ notify_consumers(_FormerActive,
! {sac,
{{subscription_id, SubscriptionId}, {active, false},
{side_effects, []}}};
notify_consumers(undefined = _FormerActive,
notify_consumers(undefined,
#consumer{pid = ConnectionPid,
subscription_id = SubscriptionId},
_Group) ->
@ -328,7 +336,9 @@ notify_consumers(undefined = _FormerActive,
{side_effects, []}}};
notify_consumers(#consumer{pid = FormerConnPid,
subscription_id = FormerSubId},
#consumer{pid = NewConnPid, subscription_id = NewSubId},
#consumer{pid = NewConnPid,
subscription_id = NewSubId,
active = true},
_Group) ->
FormerConnPid
! {sac,
@ -337,7 +347,15 @@ notify_consumers(#consumer{pid = FormerConnPid,
[{message, NewConnPid,
{sac,
{{subscription_id, NewSubId}, {active, true},
{side_effects, []}}}}]}}}.
{side_effects, []}}}}]}}};
notify_consumers(_StillActive,
#consumer{pid = NewConnPid,
subscription_id = NewSubId,
active = false},
_Group) ->
NewConnPid
! {sac,
{{subscription_id, NewSubId}, {active, false}, {side_effects, []}}}.
update_groups(VirtualHost,
Stream,