KAFKA-19216: Eliminate flakiness in kafka.server.share.SharePartitionTest (#19639)

### About
11 of the test cases in `SharePartitionTest` have failed at least once
in the past 28 days.

https://develocity.apache.org/scans/tests?search.relativeStartTime=P28D&search.rootProjectNames=kafka&search.timeZoneId=Europe%2FLondon&tests.container=kafka.server.share.SharePartitionTest
Observing the flakiness, they seem to be caused due to the usage of
`SystemTimer` for various acquisition lock timeout related tests. I have
replaced the usage of `SystemTimer` with `MockTimer` and also improved
the `MockTimer` API with regard to removing the timer task entries that
have already been cancelled.
Also, this has reduced the time taken to run `SharePartitionTest` from
~6 sec to ~1.5 sec

### Testing
The testing has been done with the help of already present unit tests in
Apache Kafka.

Reviewers: Andrew Schofield <aschofield@confluent.io>
This commit is contained in:
Abhinav Dixit 2025-05-06 00:34:22 +05:30 committed by GitHub
parent 81c3a285a4
commit caf4a6cc5f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 33 additions and 14 deletions

View File

@ -2815,9 +2815,8 @@ public class CoordinatorRuntimeTest {
assertTrue(write1.isDone());
assertTrue(write2.isDone());
// All timer tasks have been cancelled. TimerTask entries are not removed in MockTimer.
assertEquals(2, timer.size());
timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled()));
// All timer tasks have been cancelled. Hence,they have been removed in MockTimer.
assertEquals(0, timer.size());
}
@Test
@ -2885,9 +2884,8 @@ public class CoordinatorRuntimeTest {
assertEquals(1, runtime.contextOrThrow(TP).coordinator.lastCommittedOffset());
assertTrue(write1.isDone());
// All timer tasks have been cancelled. TimerTask entries are not removed in MockTimer.
assertEquals(1, timer.size());
timer.taskQueue().forEach(taskEntry -> assertTrue(taskEntry.cancelled()));
// All timer tasks have been cancelled. Hence, they have been removed in MockTimer.
assertEquals(0, timer.size());
}
@Test

View File

@ -67,8 +67,7 @@ import org.apache.kafka.server.share.persister.WriteShareGroupStateResult;
import org.apache.kafka.server.storage.log.FetchIsolation;
import org.apache.kafka.server.storage.log.FetchPartitionData;
import org.apache.kafka.server.util.FutureUtils;
import org.apache.kafka.server.util.timer.SystemTimer;
import org.apache.kafka.server.util.timer.SystemTimerReaper;
import org.apache.kafka.server.util.timer.MockTimer;
import org.apache.kafka.server.util.timer.Timer;
import org.apache.kafka.storage.internals.log.OffsetResultHolder;
import org.apache.kafka.test.TestUtils;
@ -117,7 +116,7 @@ public class SharePartitionTest {
private static final Time MOCK_TIME = new MockTime();
private static final short MAX_IN_FLIGHT_MESSAGES = 200;
private static final int ACQUISITION_LOCK_TIMEOUT_MS = 100;
private static final int DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS = 300;
private static final int DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS = 120;
private static final int BATCH_SIZE = 500;
private static final int DEFAULT_FETCH_OFFSET = 0;
private static final int MAX_FETCH_RECORDS = Integer.MAX_VALUE;
@ -129,8 +128,7 @@ public class SharePartitionTest {
@BeforeEach
public void setUp() {
kafka.utils.TestUtils.clearYammerMetrics();
mockTimer = new SystemTimerReaper("share-group-lock-timeout-test-reaper",
new SystemTimer("share-group-lock-test-timeout"));
mockTimer = new MockTimer();
sharePartitionMetrics = new SharePartitionMetrics(GROUP_ID, TOPIC_ID_PARTITION.topic(), TOPIC_ID_PARTITION.partition());
}
@ -2925,6 +2923,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.nextFetchOffset() == 0 &&
sharePartition.cachedState().get(0L).batchState() == RecordState.AVAILABLE &&
@ -2951,6 +2950,7 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0
&& sharePartition.nextFetchOffset() == 10
@ -2985,6 +2985,7 @@ public class SharePartitionTest {
assertEquals(2, sharePartition.timer().size());
// Allowing acquisition lock to expire. The acquisition lock timeout will cause release of records for all the acquired records.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
@ -3012,6 +3013,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 10 &&
@ -3128,6 +3130,7 @@ public class SharePartitionTest {
// Allowing acquisition lock to expire. The acquisition lock timeout will cause release of records for batch with starting offset 1.
// Since, other records have been acknowledged.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 1 &&
@ -3155,6 +3158,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 10 &&
@ -3179,6 +3183,7 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.timer().size());
// Allowing acquisition lock to expire for the acquired subset batch.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
@ -3259,6 +3264,7 @@ public class SharePartitionTest {
assertEquals(3, sharePartition.timer().size());
// Allowing acquisition lock to expire for the offsets that have not been acknowledged yet.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap1 = new HashMap<>();
@ -3321,6 +3327,7 @@ public class SharePartitionTest {
assertEquals(2, sharePartition.timer().size());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
@ -3338,6 +3345,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire to archive the records that reach max delivery count.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
@ -3363,6 +3371,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
@ -3386,6 +3395,7 @@ public class SharePartitionTest {
assertNull(sharePartition.cachedState().get(0L).offsetState().get(9L).acquisitionLockTimeoutTask());
// Allowing acquisition lock to expire to archive the records that reach max delivery count.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
@ -3436,6 +3446,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 0 &&
@ -3450,6 +3461,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire to archive the records that reach max delivery count.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
// After the second failed attempt to acknowledge the record batch successfully, the record batch is archived.
@ -3473,6 +3485,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 5 &&
@ -3531,6 +3544,7 @@ public class SharePartitionTest {
assertEquals(1, sharePartition.timer().size());
// Allowing acquisition lock to expire will only affect the offsets that have not been acknowledged yet.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
// Check cached state.
@ -3576,6 +3590,7 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask());
// Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.timer().size() == 0 &&
sharePartition.nextFetchOffset() == 5 &&
@ -3616,6 +3631,7 @@ public class SharePartitionTest {
Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult));
// Allowing acquisition lock to expire. Even if write share group state RPC fails, state transition still happens.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
@ -4184,7 +4200,6 @@ public class SharePartitionTest {
@Test
public void testAcquisitionLockOnReleasingAcknowledgedMultipleSubsetRecordBatchWithGapOffsets() {
SharePartition sharePartition = SharePartitionBuilder.builder()
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withState(SharePartitionState.ACTIVE)
.build();
MemoryRecords records1 = memoryRecords(2, 5);
@ -4977,6 +4992,7 @@ public class SharePartitionTest {
assertEquals(7, sharePartition.cachedState().size());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap1 = new HashMap<>();
@ -5035,6 +5051,7 @@ public class SharePartitionTest {
assertEquals(2, sharePartition.cachedState().size());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.cachedState().get(5L).batchMemberId().equals(EMPTY_MEMBER_ID) &&
sharePartition.cachedState().get(5L).batchState() == RecordState.ARCHIVED &&
@ -5063,6 +5080,7 @@ public class SharePartitionTest {
assertEquals(2, sharePartition.cachedState().size());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> {
Map<Long, InFlightState> expectedOffsetStateMap = new HashMap<>();
@ -5088,7 +5106,6 @@ public class SharePartitionTest {
Mockito.when(groupConfig.shareRecordLockDurationMs()).thenReturn(expectedDurationMs);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withGroupConfigManager(groupConfigManager).build();
SharePartition.AcquisitionLockTimerTask timerTask = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
@ -5111,7 +5128,6 @@ public class SharePartitionTest {
.thenReturn(expectedDurationMs2);
SharePartition sharePartition = SharePartitionBuilder.builder()
.withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS)
.withGroupConfigManager(groupConfigManager).build();
SharePartition.AcquisitionLockTimerTask timerTask1 = sharePartition.scheduleAcquisitionLockTimeout(MEMBER_ID, 100L, 200L);
@ -5257,6 +5273,7 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(2L).batchAcquisitionLockTimeoutTask());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.nextFetchOffset() == 7 && sharePartition.cachedState().isEmpty() &&
sharePartition.startOffset() == 7 && sharePartition.endOffset() == 7,
@ -5306,6 +5323,7 @@ public class SharePartitionTest {
assertNotNull(sharePartition.cachedState().get(1L).batchAcquisitionLockTimeoutTask());
// Allowing acquisition lock to expire.
mockTimer.advanceClock(DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS);
TestUtils.waitForCondition(
() -> sharePartition.nextFetchOffset() == 3 && sharePartition.cachedState().isEmpty() &&
sharePartition.startOffset() == 3 && sharePartition.endOffset() == 3,

View File

@ -81,6 +81,9 @@ public class MockTimer implements Timer {
}
public int size() {
synchronized (taskQueue) {
taskQueue.removeIf(TimerTaskEntry::cancelled);
}
return taskQueue.size();
}