Merge pull request #5085 from rabbitmq/amqp-stream-consumer-file-handle-leak-fix
This commit is contained in:
commit
bebe9654a0
|
|
@ -312,14 +312,20 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0,
|
|||
|
||||
cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0,
|
||||
name = QName} = State) ->
|
||||
Readers = maps:remove(ConsumerTag, Readers0),
|
||||
rabbit_core_metrics:consumer_deleted(self(), ConsumerTag, QName),
|
||||
rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag},
|
||||
{channel, self()},
|
||||
{queue, QName},
|
||||
{user_who_performed_action, ActingUser}]),
|
||||
maybe_send_reply(self(), OkMsg),
|
||||
{ok, State#stream_client{readers = Readers}}.
|
||||
case maps:take(ConsumerTag, Readers0) of
|
||||
{#stream{log = Log}, Readers} ->
|
||||
ok = close_log(Log),
|
||||
rabbit_core_metrics:consumer_deleted(self(), ConsumerTag, QName),
|
||||
rabbit_event:notify(consumer_deleted,
|
||||
[{consumer_tag, ConsumerTag},
|
||||
{channel, self()},
|
||||
{queue, QName},
|
||||
{user_who_performed_action, ActingUser}]),
|
||||
maybe_send_reply(self(), OkMsg),
|
||||
{ok, State#stream_client{readers = Readers}};
|
||||
error ->
|
||||
{ok, State}
|
||||
end.
|
||||
|
||||
credit(CTag, Credit, Drain, #stream_client{readers = Readers0,
|
||||
name = Name,
|
||||
|
|
@ -1019,3 +1025,7 @@ set_leader_pid(Pid, QName) ->
|
|||
_ ->
|
||||
ok
|
||||
end.
|
||||
|
||||
close_log(undefined) -> ok;
|
||||
close_log(Log) ->
|
||||
osiris_log:close(Log).
|
||||
|
|
|
|||
Loading…
Reference in New Issue