Fix stream consumer state change

References #3753
This commit is contained in:
Arnaud Cogoluègnes 2021-12-03 13:42:48 +01:00
parent 2fe558ab50
commit 70598325a9
No known key found for this signature in database
GPG Key ID: D5C8C4DFAD43AFA8
1 changed files with 20 additions and 8 deletions

View File

@ -659,7 +659,10 @@ close(Transport,
osiris_log:close(L)
end
end
|| #consumer{log = Log, configuration = #consumer_configuration{properties = Properties}} = Consumer
|| #consumer{log = Log,
configuration =
#consumer_configuration{properties = Properties}} =
Consumer
<- maps:values(Consumers)],
Transport:shutdown(S, write),
Transport:close(S).
@ -788,7 +791,7 @@ open(info,
case Consumers0 of
#{SubId := Consumer0} ->
%% FIXME check consumer is SAC, to avoid changing a regular consumer
#consumer{log = Log0} = Consumer0,
#consumer{log = Log0, configuration = Conf0} = Consumer0,
Log1 =
case {Active, Log0} of
{false, undefined} ->
@ -802,7 +805,11 @@ open(info,
_ ->
Log0
end,
Consumer1 = Consumer0#consumer{configuration = #consumer_configuration{active = Active}, log = Log1},
Consumer1 =
Consumer0#consumer{configuration =
Conf0#consumer_configuration{active =
Active},
log = Log1},
Conn1 =
maybe_notify_consumer(Transport,
@ -1063,8 +1070,7 @@ open(cast,
Consumers1 =
lists:foldl(fun(SubscriptionId, ConsumersAcc) ->
#{SubscriptionId := Consumer} = ConsumersAcc,
#consumer{credit = Credit,
log = Log} =
#consumer{credit = Credit, log = Log} =
Consumer,
Consumer1 =
case {Credit, Log} of
@ -2450,6 +2456,7 @@ handle_frame_post_auth(Transport,
stream =
Stream}} =
Consumer,
OffsetSpec =
case ResponseOffsetSpec of
none ->
@ -2465,6 +2472,7 @@ handle_frame_post_auth(Transport,
#resource{name = Stream,
kind = queue,
virtual_host = VirtualHost},
Segment =
init_reader(ConnTransport,
LocalMemberPid,
@ -2711,9 +2719,13 @@ maybe_notify_consumer(Transport,
maybe_unregister_consumer(_, _, false = _Sac) ->
ok;
maybe_unregister_consumer(VirtualHost,
#consumer{configuration = #consumer_configuration{stream = Stream,
properties = Properties,
subscription_id = SubscriptionId}},
#consumer{configuration =
#consumer_configuration{stream = Stream,
properties =
Properties,
subscription_id
=
SubscriptionId}},
true = _Sac) ->
ConsumerName = consumer_name(Properties),
rabbit_stream_sac_coordinator:unregister_consumer(VirtualHost,