mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-14178 Don't record queue time for deferred events (#12551)
This commit is contained in:
		
							parent
							
								
									4170e74295
								
							
						
					
					
						commit
						5eff8592cc
					
				|  | @ -709,13 +709,19 @@ public final class QuorumController implements Controller { | |||
|         private final CompletableFuture<T> future; | ||||
|         private final ControllerWriteOperation<T> op; | ||||
|         private final long eventCreatedTimeNs = time.nanoseconds(); | ||||
|         private final boolean deferred; | ||||
|         private OptionalLong startProcessingTimeNs = OptionalLong.empty(); | ||||
|         private ControllerResultAndOffset<T> resultAndOffset; | ||||
| 
 | ||||
|         ControllerWriteEvent(String name, ControllerWriteOperation<T> op) { | ||||
|             this(name, op, false); | ||||
|         } | ||||
| 
 | ||||
|         ControllerWriteEvent(String name, ControllerWriteOperation<T> op, boolean deferred) { | ||||
|             this.name = name; | ||||
|             this.future = new CompletableFuture<T>(); | ||||
|             this.op = op; | ||||
|             this.deferred = deferred; | ||||
|             this.resultAndOffset = null; | ||||
|         } | ||||
| 
 | ||||
|  | @ -726,7 +732,11 @@ public final class QuorumController implements Controller { | |||
|         @Override | ||||
|         public void run() throws Exception { | ||||
|             long now = time.nanoseconds(); | ||||
|             controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs)); | ||||
|             if (!deferred) { | ||||
|                 // We exclude deferred events from the event queue time metric to prevent | ||||
|                 // incorrectly including the deferral time in the queue time. | ||||
|                 controllerMetrics.updateEventQueueTime(NANOSECONDS.toMillis(now - eventCreatedTimeNs)); | ||||
|             } | ||||
|             int controllerEpoch = curClaimEpoch; | ||||
|             if (!isActiveController()) { | ||||
|                 throw newNotControllerException(); | ||||
|  | @ -1163,7 +1173,7 @@ public final class QuorumController implements Controller { | |||
| 
 | ||||
|     private <T> void scheduleDeferredWriteEvent(String name, long deadlineNs, | ||||
|                                                 ControllerWriteOperation<T> op) { | ||||
|         ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op); | ||||
|         ControllerWriteEvent<T> event = new ControllerWriteEvent<>(name, op, true); | ||||
|         queue.scheduleDeferred(name, new EarliestDeadlineFunction(deadlineNs), event); | ||||
|         event.future.exceptionally(e -> { | ||||
|             if (e instanceof UnknownServerException && e.getCause() != null && | ||||
|  | @ -1236,7 +1246,7 @@ public final class QuorumController implements Controller { | |||
|                 // generated by a ControllerWriteEvent have been applied. | ||||
| 
 | ||||
|                 return result; | ||||
|             }); | ||||
|             }, true); | ||||
| 
 | ||||
|             long delayNs = time.nanoseconds(); | ||||
|             if (imbalancedScheduled == ImbalanceSchedule.DEFERRED) { | ||||
|  | @ -1281,7 +1291,7 @@ public final class QuorumController implements Controller { | |||
|                     Arrays.asList(new ApiMessageAndVersion(new NoOpRecord(), (short) 0)), | ||||
|                     null | ||||
|                 ); | ||||
|             }); | ||||
|             }, true); | ||||
| 
 | ||||
|             long delayNs = time.nanoseconds() + maxIdleIntervalNs.getAsLong(); | ||||
|             queue.scheduleDeferred(WRITE_NO_OP_RECORD, new EarliestDeadlineFunction(delayNs), event); | ||||
|  |  | |||
		Loading…
	
		Reference in New Issue