mirror of https://github.com/apache/kafka.git
KAFKA-13542: add rebalance reason in Kafka Streams (#11804)
Add rebalance reason in Kafka Streams. Reviewers: Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
parent
5f91aa7b4c
commit
2ccc834faa
|
@ -583,7 +583,7 @@ public class StreamThread extends Thread {
|
|||
runOnce();
|
||||
if (nextProbingRebalanceMs.get() < time.milliseconds()) {
|
||||
log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
|
||||
mainConsumer.enforceRebalance();
|
||||
mainConsumer.enforceRebalance("Scheduled probing rebalance.");
|
||||
nextProbingRebalanceMs.set(Long.MAX_VALUE);
|
||||
}
|
||||
} catch (final TaskCorruptedException e) {
|
||||
|
@ -595,7 +595,7 @@ public class StreamThread extends Thread {
|
|||
final boolean enforceRebalance = taskManager.handleCorruption(e.corruptedTasks());
|
||||
if (enforceRebalance && eosEnabled) {
|
||||
log.info("Active task(s) got corrupted. Triggering a rebalance.");
|
||||
mainConsumer.enforceRebalance();
|
||||
mainConsumer.enforceRebalance("Active tasks corrupted.");
|
||||
}
|
||||
} catch (final TaskMigratedException taskMigrated) {
|
||||
handleTaskMigrated(taskMigrated);
|
||||
|
@ -637,7 +637,7 @@ public class StreamThread extends Thread {
|
|||
if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
|
||||
log.warn("Detected that shutdown was requested. " +
|
||||
"All clients in this app will now begin to shutdown");
|
||||
mainConsumer.enforceRebalance();
|
||||
mainConsumer.enforceRebalance("Shutdown requested.");
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -481,10 +481,13 @@ public class StreamThreadTest {
|
|||
expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
|
||||
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
|
||||
EasyMock.replay(consumerGroupMetadata);
|
||||
final EasyMockConsumerClientSupplier mockClientSupplier = new EasyMockConsumerClientSupplier(mockConsumer);
|
||||
|
||||
final EasyMockConsumerClientSupplier mockClientSupplier = new EasyMockConsumerClientSupplier(mockConsumer);
|
||||
mockClientSupplier.setCluster(createCluster());
|
||||
|
||||
mockConsumer.enforceRebalance("Scheduled probing rebalance.");
|
||||
EasyMock.replay(mockConsumer);
|
||||
|
||||
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
|
||||
topologyMetadata.buildAndRewriteTopology();
|
||||
final StreamThread thread = StreamThread.create(
|
||||
|
@ -505,7 +508,6 @@ public class StreamThreadTest {
|
|||
null
|
||||
);
|
||||
|
||||
mockConsumer.enforceRebalance();
|
||||
|
||||
mockClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1L);
|
||||
|
||||
|
@ -2356,7 +2358,7 @@ public class StreamThreadTest {
|
|||
expect(task2.id()).andReturn(taskId2).anyTimes();
|
||||
expect(taskManager.handleCorruption(corruptedTasks)).andReturn(true);
|
||||
|
||||
consumer.enforceRebalance();
|
||||
consumer.enforceRebalance("Active tasks corrupted.");
|
||||
expectLastCall();
|
||||
|
||||
EasyMock.replay(task1, task2, taskManager, consumer);
|
||||
|
|
Loading…
Reference in New Issue