mirror of https://github.com/apache/kafka.git
				
				
				
			KAFKA-18484 [2/2]; Handle exceptions during coordinator unload (#18667)
Ensure that unloading a coordinator always succeeds. Previously, we have guarded against exceptions from DeferredEvent completions. All that remains is handling exceptions from the onUnloaded() method of the coordinator state machine. Reviewers: David Jacot <djacot@confluent.io>
This commit is contained in:
		
							parent
							
								
									c5a51aba18
								
							
						
					
					
						commit
						3fe33f8ad6
					
				|  | @ -742,7 +742,11 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut | ||||||
|             deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception()); |             deferredEventQueue.failAll(Errors.NOT_COORDINATOR.exception()); | ||||||
|             failCurrentBatch(Errors.NOT_COORDINATOR.exception()); |             failCurrentBatch(Errors.NOT_COORDINATOR.exception()); | ||||||
|             if (coordinator != null) { |             if (coordinator != null) { | ||||||
|                 coordinator.onUnloaded(); |                 try { | ||||||
|  |                     coordinator.onUnloaded(); | ||||||
|  |                 } catch (Throwable ex) { | ||||||
|  |                     log.error("Failed to unload coordinator for {} due to {}.", tp, ex.getMessage(), ex); | ||||||
|  |                 } | ||||||
|             } |             } | ||||||
|             coordinator = null; |             coordinator = null; | ||||||
|         } |         } | ||||||
|  | @ -2415,9 +2419,19 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut | ||||||
|                 try { |                 try { | ||||||
|                     if (partitionEpoch.isEmpty() || context.epoch < partitionEpoch.getAsInt()) { |                     if (partitionEpoch.isEmpty() || context.epoch < partitionEpoch.getAsInt()) { | ||||||
|                         log.info("Started unloading metadata for {} with epoch {}.", tp, partitionEpoch); |                         log.info("Started unloading metadata for {} with epoch {}.", tp, partitionEpoch); | ||||||
|                         context.transitionTo(CoordinatorState.CLOSED); |                         try { | ||||||
|                         coordinators.remove(tp, context); |                             context.transitionTo(CoordinatorState.CLOSED); | ||||||
|                         log.info("Finished unloading metadata for {} with epoch {}.", tp, partitionEpoch); |                             log.info("Finished unloading metadata for {} with epoch {}.", tp, partitionEpoch); | ||||||
|  |                         } catch (Throwable ex) { | ||||||
|  |                             // It's very unlikely that we will ever see an exception here, since we | ||||||
|  |                             // already make an effort to catch exceptions in the unload method. | ||||||
|  |                             log.error("Failed to unload metadata for {} with epoch {} due to {}.", | ||||||
|  |                                 tp, partitionEpoch, ex.toString()); | ||||||
|  |                         } finally { | ||||||
|  |                             // Always remove the coordinator context, otherwise the coordinator | ||||||
|  |                             // shard could be permanently stuck. | ||||||
|  |                             coordinators.remove(tp, context); | ||||||
|  |                         } | ||||||
|                     } else { |                     } else { | ||||||
|                         log.info("Ignored unloading metadata for {} in epoch {} since current epoch is {}.", |                         log.info("Ignored unloading metadata for {} in epoch {} since current epoch is {}.", | ||||||
|                             tp, partitionEpoch, context.epoch); |                             tp, partitionEpoch, context.epoch); | ||||||
|  | @ -2498,6 +2512,8 @@ public class CoordinatorRuntime<S extends CoordinatorShard<U>, U> implements Aut | ||||||
|             context.lock.lock(); |             context.lock.lock(); | ||||||
|             try { |             try { | ||||||
|                 context.transitionTo(CoordinatorState.CLOSED); |                 context.transitionTo(CoordinatorState.CLOSED); | ||||||
|  |             } catch (Throwable ex) { | ||||||
|  |                 log.warn("Failed to unload metadata for {} due to {}.", tp, ex.getMessage(), ex); | ||||||
|             } finally { |             } finally { | ||||||
|                 context.lock.unlock(); |                 context.lock.unlock(); | ||||||
|             } |             } | ||||||
|  |  | ||||||
|  | @ -1118,6 +1118,58 @@ public class CoordinatorRuntimeTest { | ||||||
|         assertEquals(10, ctx.epoch); |         assertEquals(10, ctx.epoch); | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     @Test | ||||||
|  |     public void testScheduleUnloadingWithException() { | ||||||
|  |         MockTimer timer = new MockTimer(); | ||||||
|  |         MockPartitionWriter writer = mock(MockPartitionWriter.class); | ||||||
|  |         MockCoordinatorShardBuilderSupplier supplier = mock(MockCoordinatorShardBuilderSupplier.class); | ||||||
|  |         MockCoordinatorShardBuilder builder = mock(MockCoordinatorShardBuilder.class); | ||||||
|  |         MockCoordinatorShard coordinator = mock(MockCoordinatorShard.class); | ||||||
|  |         CoordinatorRuntimeMetrics metrics = mock(CoordinatorRuntimeMetrics.class); | ||||||
|  | 
 | ||||||
|  |         CoordinatorRuntime<MockCoordinatorShard, String> runtime = | ||||||
|  |             new CoordinatorRuntime.Builder<MockCoordinatorShard, String>() | ||||||
|  |                 .withTime(timer.time()) | ||||||
|  |                 .withTimer(timer) | ||||||
|  |                 .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) | ||||||
|  |                 .withLoader(new MockCoordinatorLoader()) | ||||||
|  |                 .withEventProcessor(new DirectEventProcessor()) | ||||||
|  |                 .withPartitionWriter(writer) | ||||||
|  |                 .withCoordinatorShardBuilderSupplier(supplier) | ||||||
|  |                 .withCoordinatorRuntimeMetrics(metrics) | ||||||
|  |                 .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) | ||||||
|  |                 .withSerializer(new StringSerializer()) | ||||||
|  |                 .withExecutorService(mock(ExecutorService.class)) | ||||||
|  |                 .build(); | ||||||
|  | 
 | ||||||
|  |         doThrow(new KafkaException("error")).when(coordinator).onUnloaded(); | ||||||
|  |         when(builder.withSnapshotRegistry(any())).thenReturn(builder); | ||||||
|  |         when(builder.withLogContext(any())).thenReturn(builder); | ||||||
|  |         when(builder.withTime(any())).thenReturn(builder); | ||||||
|  |         when(builder.withTimer(any())).thenReturn(builder); | ||||||
|  |         when(builder.withCoordinatorMetrics(any())).thenReturn(builder); | ||||||
|  |         when(builder.withTopicPartition(any())).thenReturn(builder); | ||||||
|  |         when(builder.withExecutor(any())).thenReturn(builder); | ||||||
|  |         when(builder.build()).thenReturn(coordinator); | ||||||
|  |         when(supplier.get()).thenReturn(builder); | ||||||
|  | 
 | ||||||
|  |         // Loads the coordinator. It directly transitions to active. | ||||||
|  |         runtime.scheduleLoadOperation(TP, 10); | ||||||
|  |         CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP); | ||||||
|  |         assertEquals(ACTIVE, ctx.state); | ||||||
|  |         assertEquals(10, ctx.epoch); | ||||||
|  | 
 | ||||||
|  |         // Schedule the unloading. | ||||||
|  |         runtime.scheduleUnloadOperation(TP, OptionalInt.of(ctx.epoch + 1)); | ||||||
|  |         assertEquals(CLOSED, ctx.state); | ||||||
|  | 
 | ||||||
|  |         // Verify that onUnloaded is called. | ||||||
|  |         verify(coordinator, times(1)).onUnloaded(); | ||||||
|  | 
 | ||||||
|  |         // Getting the coordinator context fails because it no longer exists. | ||||||
|  |         assertThrows(NotCoordinatorException.class, () -> runtime.contextOrThrow(TP)); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     @Test |     @Test | ||||||
|     public void testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionException, InterruptedException, TimeoutException { |     public void testScheduleUnloadingWithDeferredEventExceptions() throws ExecutionException, InterruptedException, TimeoutException { | ||||||
|         MockTimer timer = new MockTimer(); |         MockTimer timer = new MockTimer(); | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue