CQ: don't deliver right before acking in the index
This commit is contained in:
		
							parent
							
								
									e0602e7909
								
							
						
					
					
						commit
						3ef858746c
					
				|  | @ -831,9 +831,9 @@ action_to_entry(RelSeq, Action, JEntries) -> | |||
|              end}; | ||||
|         ({Pub,    no_del, no_ack}) when Action == del -> | ||||
|             {set, {Pub,    del, no_ack}}; | ||||
|         ({no_pub,    del, no_ack}) when Action == ack -> | ||||
|         ({no_pub,    _del, no_ack}) when Action == ack -> | ||||
|             {set, {no_pub, del,    ack}}; | ||||
|         ({?PUB,      del, no_ack}) when Action == ack -> | ||||
|         ({?PUB,      _del, no_ack}) when Action == ack -> | ||||
|             {reset, none} | ||||
|     end. | ||||
| 
 | ||||
|  |  | |||
|  | @ -1774,22 +1774,21 @@ purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { mode = Mode }) -> | |||
| 
 | ||||
| remove_queue_entries(Q, DelsAndAcksFun, | ||||
|                      State = #vqstate{msg_store_clients = MSCState}) -> | ||||
|     {MsgIdsByStore, Delivers, Acks, State1} = | ||||
|     {MsgIdsByStore, Acks, State1} = | ||||
|         ?QUEUE:foldl(fun remove_queue_entries1/2, | ||||
|                      {maps:new(), [], [], State}, Q), | ||||
|                      {maps:new(), [], State}, Q), | ||||
|     remove_msgs_by_id(MsgIdsByStore, MSCState), | ||||
|     DelsAndAcksFun(Delivers, Acks, State1). | ||||
|     DelsAndAcksFun([], Acks, State1). | ||||
| 
 | ||||
| remove_queue_entries1( | ||||
|   #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, | ||||
|   #msg_status { msg_id = MsgId, seq_id = SeqId, | ||||
|                 msg_in_store = MsgInStore, index_on_disk = IndexOnDisk, | ||||
|                 is_persistent = IsPersistent} = MsgStatus, | ||||
|   {MsgIdsByStore, Delivers, Acks, State}) -> | ||||
|   {MsgIdsByStore, Acks, State}) -> | ||||
|     {case MsgInStore of | ||||
|          true  -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore); | ||||
|          false -> MsgIdsByStore | ||||
|      end, | ||||
|      cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), | ||||
|      cons_if(IndexOnDisk, SeqId, Acks), | ||||
|      stats({-1, 0}, {MsgStatus, none}, 0, State)}. | ||||
| 
 | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue