KAFKA-13542: add rebalance reason in Kafka Streams (#11804)

Add rebalance reason in Kafka Streams.

Reviewers: Luke Chen <showuon@gmail.com>, Bruno Cadonna <cadonna@apache.org>
This commit is contained in:
Hao Li 2022-02-28 09:26:46 -08:00 committed by GitHub
parent 5f91aa7b4c
commit 2ccc834faa
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 8 additions and 6 deletions

View File

@ -583,7 +583,7 @@ public class StreamThread extends Thread {
runOnce();
if (nextProbingRebalanceMs.get() < time.milliseconds()) {
log.info("Triggering the followup rebalance scheduled for {} ms.", nextProbingRebalanceMs.get());
mainConsumer.enforceRebalance();
mainConsumer.enforceRebalance("Scheduled probing rebalance.");
nextProbingRebalanceMs.set(Long.MAX_VALUE);
}
} catch (final TaskCorruptedException e) {
@ -595,7 +595,7 @@ public class StreamThread extends Thread {
final boolean enforceRebalance = taskManager.handleCorruption(e.corruptedTasks());
if (enforceRebalance && eosEnabled) {
log.info("Active task(s) got corrupted. Triggering a rebalance.");
mainConsumer.enforceRebalance();
mainConsumer.enforceRebalance("Active tasks corrupted.");
}
} catch (final TaskMigratedException taskMigrated) {
handleTaskMigrated(taskMigrated);
@ -637,7 +637,7 @@ public class StreamThread extends Thread {
if (assignmentErrorCode.get() == AssignorError.SHUTDOWN_REQUESTED.code()) {
log.warn("Detected that shutdown was requested. " +
"All clients in this app will now begin to shutdown");
mainConsumer.enforceRebalance();
mainConsumer.enforceRebalance("Shutdown requested.");
}
}

View File

@ -481,10 +481,13 @@ public class StreamThreadTest {
expect(mockConsumer.groupMetadata()).andStubReturn(consumerGroupMetadata);
expect(consumerGroupMetadata.groupInstanceId()).andReturn(Optional.empty());
EasyMock.replay(consumerGroupMetadata);
final EasyMockConsumerClientSupplier mockClientSupplier = new EasyMockConsumerClientSupplier(mockConsumer);
final EasyMockConsumerClientSupplier mockClientSupplier = new EasyMockConsumerClientSupplier(mockConsumer);
mockClientSupplier.setCluster(createCluster());
mockConsumer.enforceRebalance("Scheduled probing rebalance.");
EasyMock.replay(mockConsumer);
final TopologyMetadata topologyMetadata = new TopologyMetadata(internalTopologyBuilder, config);
topologyMetadata.buildAndRewriteTopology();
final StreamThread thread = StreamThread.create(
@ -505,7 +508,6 @@ public class StreamThreadTest {
null
);
mockConsumer.enforceRebalance();
mockClientSupplier.nextRebalanceMs().set(mockTime.milliseconds() - 1L);
@ -2356,7 +2358,7 @@ public class StreamThreadTest {
expect(task2.id()).andReturn(taskId2).anyTimes();
expect(taskManager.handleCorruption(corruptedTasks)).andReturn(true);
consumer.enforceRebalance();
consumer.enforceRebalance("Active tasks corrupted.");
expectLastCall();
EasyMock.replay(task1, task2, taskManager, consumer);