Handle duplicate packet IDs
"If a Client re-sends a particular Control Packet, then it MUST use the same Packet Identifier in subsequent re-sends of that packet." A client can re-send a PUBLISH packet with the same packet ID. If the MQTT connection process already received the original packet and sent it to destination queues, it will ignore this re-send. The packet ID will be acked to the publisher once a confirmation from all target queues is received. There should be no risk of "stuck" messages within the MQTT connection process because quorum and stream queue clients re-send the message and classic queues will send a monitor DOWN message in case they are down.
This commit is contained in:
		
							parent
							
								
									4c15299196
								
							
						
					
					
						commit
						319af3872e
					
				| 
						 | 
				
			
			@ -44,7 +44,7 @@ insert(SeqNo, QNames, #resource{kind = exchange} = XName,
 | 
			
		|||
                unconfirmed = U0} = State)
 | 
			
		||||
  when is_integer(SeqNo)
 | 
			
		||||
       andalso is_list(QNames)
 | 
			
		||||
       andalso is_map_key(SeqNo, U0) == false ->
 | 
			
		||||
       andalso not is_map_key(SeqNo, U0) ->
 | 
			
		||||
    U = U0#{SeqNo => {XName, maps:from_list([{Q, ok} || Q <- QNames])}},
 | 
			
		||||
    S = case S0 of
 | 
			
		||||
            undefined -> SeqNo;
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,7 +15,8 @@
 | 
			
		|||
         confirm/3,
 | 
			
		||||
         reject/2,
 | 
			
		||||
         remove_queue/2,
 | 
			
		||||
         size/1]).
 | 
			
		||||
         size/1,
 | 
			
		||||
         contains/2]).
 | 
			
		||||
 | 
			
		||||
-type queue_name() :: rabbit_amqqueue:name().
 | 
			
		||||
-opaque state() :: #{packet_id() => #{queue_name() => ok}}.
 | 
			
		||||
| 
						 | 
				
			
			@ -29,15 +30,17 @@ init() ->
 | 
			
		|||
size(State) ->
 | 
			
		||||
    maps:size(State).
 | 
			
		||||
 | 
			
		||||
-spec insert(packet_id(), [queue_name()], state()) ->
 | 
			
		||||
    {ok, state()} | {error, {duplicate_packet_id, packet_id()}}.
 | 
			
		||||
insert(PktId, _, State)
 | 
			
		||||
  when is_map_key(PktId, State) ->
 | 
			
		||||
    {error, {duplicate_packet_id, PktId}};
 | 
			
		||||
-spec contains(packet_id(), state()) -> boolean().
 | 
			
		||||
contains(PktId, State) ->
 | 
			
		||||
    maps:is_key(PktId, State).
 | 
			
		||||
 | 
			
		||||
-spec insert(packet_id(), [queue_name()], state()) -> state().
 | 
			
		||||
insert(PktId, QNames, State)
 | 
			
		||||
  when is_integer(PktId) andalso PktId > 0 ->
 | 
			
		||||
  when is_integer(PktId) andalso
 | 
			
		||||
       PktId > 0 andalso
 | 
			
		||||
       not is_map_key(PktId, State) ->
 | 
			
		||||
    QMap = maps:from_keys(QNames, ok),
 | 
			
		||||
    {ok, maps:put(PktId, QMap, State)}.
 | 
			
		||||
    maps:put(PktId, QMap, State).
 | 
			
		||||
 | 
			
		||||
-spec confirm([packet_id()], queue_name(), state()) ->
 | 
			
		||||
    {[packet_id()], state()}.
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -7,7 +7,6 @@
 | 
			
		|||
 | 
			
		||||
-module(rabbit_mqtt_processor).
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
-export([info/2, initial_state/2, initial_state/4,
 | 
			
		||||
         process_frame/2, serialise/2, send_will/1,
 | 
			
		||||
         terminate/2, handle_pre_hibernate/0,
 | 
			
		||||
| 
						 | 
				
			
			@ -115,34 +114,44 @@ process_request(?PUBLISH,
 | 
			
		|||
                                                  message_id = MessageId },
 | 
			
		||||
                   payload = Payload},
 | 
			
		||||
                PState = #proc_state{retainer_pid = RPid,
 | 
			
		||||
                                     amqp2mqtt_fun = Amqp2MqttFun}) ->
 | 
			
		||||
                                     amqp2mqtt_fun = Amqp2MqttFun,
 | 
			
		||||
                                     unacked_client_pubs = U}) ->
 | 
			
		||||
    rabbit_global_counters:messages_received(mqtt, 1),
 | 
			
		||||
    Publish = fun() ->
 | 
			
		||||
                      Msg = #mqtt_msg{retain     = Retain,
 | 
			
		||||
                                      qos        = Qos,
 | 
			
		||||
                                      topic      = Topic,
 | 
			
		||||
                                      dup        = Dup,
 | 
			
		||||
                                      message_id = MessageId,
 | 
			
		||||
                                      payload    = Payload},
 | 
			
		||||
                      case publish_to_queues(Msg, PState) of
 | 
			
		||||
                          {ok, _} = Ok ->
 | 
			
		||||
                              case Retain of
 | 
			
		||||
                                  false ->
 | 
			
		||||
                                      ok;
 | 
			
		||||
                                  true ->
 | 
			
		||||
                                      hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, Msg)
 | 
			
		||||
                              end,
 | 
			
		||||
                              Ok;
 | 
			
		||||
                          Error ->
 | 
			
		||||
                              Error
 | 
			
		||||
                      end
 | 
			
		||||
              end,
 | 
			
		||||
    case Qos of
 | 
			
		||||
        N when N > ?QOS_0 ->
 | 
			
		||||
            rabbit_global_counters:messages_received_confirm(mqtt, 1);
 | 
			
		||||
            rabbit_global_counters:messages_received_confirm(mqtt, 1),
 | 
			
		||||
            case rabbit_mqtt_confirms:contains(MessageId, U) of
 | 
			
		||||
                false ->
 | 
			
		||||
                    publish_to_queues_with_checks(Topic, Publish, PState);
 | 
			
		||||
                true ->
 | 
			
		||||
                    %% Client re-sent this PUBLISH packet.
 | 
			
		||||
                    %% We already sent this message to target queues awaiting confirmations.
 | 
			
		||||
                    %% Hence, we ignore this re-send.
 | 
			
		||||
                    {ok, PState}
 | 
			
		||||
            end;
 | 
			
		||||
        _ ->
 | 
			
		||||
            ok
 | 
			
		||||
    end,
 | 
			
		||||
    publish_to_queues_with_checks(
 | 
			
		||||
      Topic,
 | 
			
		||||
      fun() ->
 | 
			
		||||
              Msg = #mqtt_msg{retain     = Retain,
 | 
			
		||||
                              qos        = Qos,
 | 
			
		||||
                              topic      = Topic,
 | 
			
		||||
                              dup        = Dup,
 | 
			
		||||
                              message_id = MessageId,
 | 
			
		||||
                              payload    = Payload},
 | 
			
		||||
              case publish_to_queues(Msg, PState) of
 | 
			
		||||
                  {ok, _} = Ok ->
 | 
			
		||||
                      case Retain of
 | 
			
		||||
                          false -> ok;
 | 
			
		||||
                          true  -> hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, Msg)
 | 
			
		||||
                      end,
 | 
			
		||||
                      Ok;
 | 
			
		||||
                  Error ->
 | 
			
		||||
                      Error
 | 
			
		||||
              end
 | 
			
		||||
      end, PState);
 | 
			
		||||
            publish_to_queues_with_checks(Topic, Publish, PState)
 | 
			
		||||
    end;
 | 
			
		||||
 | 
			
		||||
process_request(?SUBSCRIBE,
 | 
			
		||||
                #mqtt_frame{
 | 
			
		||||
| 
						 | 
				
			
			@ -1156,7 +1165,7 @@ process_routing_confirm(#delivery{confirm = true,
 | 
			
		|||
                                  msg_seq_no = MsgId},
 | 
			
		||||
                        Qs, PState = #proc_state{unacked_client_pubs = U0}) ->
 | 
			
		||||
    QNames = lists:map(fun amqqueue:get_name/1, Qs),
 | 
			
		||||
    {ok, U} = rabbit_mqtt_confirms:insert(MsgId, QNames, U0),
 | 
			
		||||
    U = rabbit_mqtt_confirms:insert(MsgId, QNames, U0),
 | 
			
		||||
    PState#proc_state{unacked_client_pubs = U}.
 | 
			
		||||
 | 
			
		||||
send_puback(MsgIds, PState)
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -15,6 +15,7 @@
 | 
			
		|||
 | 
			
		||||
-import(rabbit_ct_broker_helpers, [rabbitmqctl_list/3,
 | 
			
		||||
                                   rpc_all/4]).
 | 
			
		||||
-import(rabbit_ct_helpers, [eventually/3]).
 | 
			
		||||
-import(util, [all_connection_pids/1,
 | 
			
		||||
               publish_qos1/4]).
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			@ -121,8 +122,7 @@ publish_to_all_queue_types(Config, QoS) ->
 | 
			
		|||
    bind(Ch, SQ, Topic),
 | 
			
		||||
 | 
			
		||||
    NumMsgs = 2000,
 | 
			
		||||
    NumMsgsBin = integer_to_binary(NumMsgs),
 | 
			
		||||
    {C, _} = connect(?FUNCTION_NAME, Config),
 | 
			
		||||
    {C, _} = connect(?FUNCTION_NAME, Config, [{retry_interval, 2}]),
 | 
			
		||||
    lists:foreach(fun(_N) ->
 | 
			
		||||
                          case QoS of
 | 
			
		||||
                              qos0 ->
 | 
			
		||||
| 
						 | 
				
			
			@ -132,15 +132,23 @@ publish_to_all_queue_types(Config, QoS) ->
 | 
			
		|||
                          end
 | 
			
		||||
                  end, lists:seq(1, NumMsgs)),
 | 
			
		||||
 | 
			
		||||
    Expected = lists:sort([[CQ,  NumMsgsBin],
 | 
			
		||||
                           [CMQ, NumMsgsBin],
 | 
			
		||||
                           [QQ,  NumMsgsBin],
 | 
			
		||||
                           [SQ,  NumMsgsBin]
 | 
			
		||||
                          ]),
 | 
			
		||||
    ?awaitMatch(Expected,
 | 
			
		||||
                lists:sort(rabbitmqctl_list(Config, 0, ["list_queues", "--no-table-headers",
 | 
			
		||||
                                                        "name", "messages_ready"])),
 | 
			
		||||
                20_000, 1000),
 | 
			
		||||
    eventually(?_assert(
 | 
			
		||||
                  begin
 | 
			
		||||
                      L = rabbitmqctl_list(Config, 0, ["list_queues", "messages", "--no-table-headers"]),
 | 
			
		||||
                      length(L) =:= 4 andalso
 | 
			
		||||
                      lists:all(fun([Bin]) ->
 | 
			
		||||
                                        N = binary_to_integer(Bin),
 | 
			
		||||
                                        case QoS of
 | 
			
		||||
                                            qos0 ->
 | 
			
		||||
                                                N =:= NumMsgs;
 | 
			
		||||
                                            qos1 ->
 | 
			
		||||
                                                %% Allow for some duplicates when client resends
 | 
			
		||||
                                                %% a message that gets acked at roughly the same time.
 | 
			
		||||
                                                N >= NumMsgs andalso
 | 
			
		||||
                                                N < NumMsgs * 2
 | 
			
		||||
                                        end
 | 
			
		||||
                                end, L)
 | 
			
		||||
                  end), 2000, 10),
 | 
			
		||||
 | 
			
		||||
    delete_queue(Ch, [CQ, CMQ, QQ, SQ]),
 | 
			
		||||
    ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, CMQ),
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
		Reference in New Issue