Merge pull request #14026 from rabbitmq/issue-13785_followup
MQTT: send acks before disconnecting consumer
This commit is contained in:
commit
74317c76f9
|
@ -2031,16 +2031,17 @@ handle_queue_event({queue_event, QName, Evt},
|
|||
State = handle_queue_actions(Actions, State1),
|
||||
{ok, State};
|
||||
{eol, Actions} ->
|
||||
try
|
||||
State1 = handle_queue_actions(Actions ++ [{queue_down, QName}], State0),
|
||||
{ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0),
|
||||
QStates = rabbit_queue_type:remove(QName, QStates0),
|
||||
State = State1#state{queue_states = QStates,
|
||||
unacked_client_pubs = U},
|
||||
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
|
||||
{ok, State}
|
||||
State1 = handle_queue_actions(Actions, State0),
|
||||
{ConfirmPktIds, U} = rabbit_mqtt_confirms:remove_queue(QName, U0),
|
||||
QStates = rabbit_queue_type:remove(QName, QStates0),
|
||||
State = State1#state{queue_states = QStates,
|
||||
unacked_client_pubs = U},
|
||||
send_puback(ConfirmPktIds, ?RC_SUCCESS, State),
|
||||
try handle_queue_down(QName, State) of
|
||||
State2 ->
|
||||
{ok, State2}
|
||||
catch throw:consuming_queue_down ->
|
||||
{error, consuming_queue_down, State0}
|
||||
{error, consuming_queue_down, State}
|
||||
end;
|
||||
{protocol_error, _Type, _Reason, _ReasonArgs} = Error ->
|
||||
{error, Error, State0}
|
||||
|
|
Loading…
Reference in New Issue