mirror of https://github.com/apache/kafka.git
				
				
				
			MINOR: fix BrokerMetadataPublisherTest.testExceptionInUpdateCoordinator
Fix a case where we were getting an exception because we removed a publisher, but left it in BrokerServer.metadataPublishers (resulting in us trying to remove it during broker shutdown.)
This commit is contained in:
		
							parent
							
								
									372b0f1c58
								
							
						
					
					
						commit
						145ef2d1e0
					
				|  | @ -544,7 +544,7 @@ class BrokerServer( | ||||||
|       if (socketServer != null) { |       if (socketServer != null) { | ||||||
|         CoreUtils.swallow(socketServer.stopProcessingRequests(), this) |         CoreUtils.swallow(socketServer.stopProcessingRequests(), this) | ||||||
|       } |       } | ||||||
|       metadataPublishers.forEach(p => CoreUtils.swallow(sharedServer.loader.removeAndClosePublisher(p).get(), this)) |       metadataPublishers.forEach(p => sharedServer.loader.removeAndClosePublisher(p).get()) | ||||||
|       metadataPublishers.clear() |       metadataPublishers.clear() | ||||||
|       if (dataPlaneRequestHandlerPool != null) |       if (dataPlaneRequestHandlerPool != null) | ||||||
|         CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this) |         CoreUtils.swallow(dataPlaneRequestHandlerPool.shutdown(), this) | ||||||
|  |  | ||||||
|  | @ -376,7 +376,7 @@ class ControllerServer( | ||||||
|       // Ensure that we're not the Raft leader prior to shutting down our socket server, for a |       // Ensure that we're not the Raft leader prior to shutting down our socket server, for a | ||||||
|       // smoother transition. |       // smoother transition. | ||||||
|       sharedServer.ensureNotRaftLeader() |       sharedServer.ensureNotRaftLeader() | ||||||
|       metadataPublishers.forEach(p => CoreUtils.swallow(sharedServer.loader.removeAndClosePublisher(p).get(), this)) |       metadataPublishers.forEach(p => sharedServer.loader.removeAndClosePublisher(p).get()) | ||||||
|       metadataPublishers.clear() |       metadataPublishers.clear() | ||||||
|       if (socketServer != null) |       if (socketServer != null) | ||||||
|         CoreUtils.swallow(socketServer.stopProcessingRequests(), this) |         CoreUtils.swallow(socketServer.stopProcessingRequests(), this) | ||||||
|  |  | ||||||
|  | @ -254,6 +254,7 @@ class BrokerMetadataPublisherTest { | ||||||
|       val publisher = Mockito.spy(broker.brokerMetadataPublisher) |       val publisher = Mockito.spy(broker.brokerMetadataPublisher) | ||||||
|       doThrow(new RuntimeException("injected failure")).when(publisher).updateCoordinator(any(), any(), any(), any(), any()) |       doThrow(new RuntimeException("injected failure")).when(publisher).updateCoordinator(any(), any(), any(), any(), any()) | ||||||
|       broker.sharedServer.loader.removeAndClosePublisher(broker.brokerMetadataPublisher).get(1, TimeUnit.MINUTES) |       broker.sharedServer.loader.removeAndClosePublisher(broker.brokerMetadataPublisher).get(1, TimeUnit.MINUTES) | ||||||
|  |       broker.metadataPublishers.remove(broker.brokerMetadataPublisher) | ||||||
|       broker.sharedServer.loader.installPublishers(List(publisher).asJava).get(1, TimeUnit.MINUTES) |       broker.sharedServer.loader.installPublishers(List(publisher).asJava).get(1, TimeUnit.MINUTES) | ||||||
|       val admin = Admin.create(cluster.clientProperties()) |       val admin = Admin.create(cluster.clientProperties()) | ||||||
|       try { |       try { | ||||||
|  |  | ||||||
		Loading…
	
		Reference in New Issue