diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index 7f0b6a0783..04a74b460c 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -312,14 +312,20 @@ begin_stream(#stream_client{name = QName, readers = Readers0} = State0, cancel(_Q, ConsumerTag, OkMsg, ActingUser, #stream_client{readers = Readers0, name = QName} = State) -> - Readers = maps:remove(ConsumerTag, Readers0), - rabbit_core_metrics:consumer_deleted(self(), ConsumerTag, QName), - rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag}, - {channel, self()}, - {queue, QName}, - {user_who_performed_action, ActingUser}]), - maybe_send_reply(self(), OkMsg), - {ok, State#stream_client{readers = Readers}}. + case maps:take(ConsumerTag, Readers0) of + {#stream{log = Log}, Readers} -> + ok = close_log(Log), + rabbit_core_metrics:consumer_deleted(self(), ConsumerTag, QName), + rabbit_event:notify(consumer_deleted, + [{consumer_tag, ConsumerTag}, + {channel, self()}, + {queue, QName}, + {user_who_performed_action, ActingUser}]), + maybe_send_reply(self(), OkMsg), + {ok, State#stream_client{readers = Readers}}; + error -> + {ok, State} + end. credit(CTag, Credit, Drain, #stream_client{readers = Readers0, name = Name, @@ -1019,3 +1025,7 @@ set_leader_pid(Pid, QName) -> _ -> ok end. + +close_log(undefined) -> ok; +close_log(Log) -> + osiris_log:close(Log).